From d434a4d57e9aaaed1012702d32e60e385dafaf8f Mon Sep 17 00:00:00 2001 From: Garrybest Date: Sun, 27 Jun 2021 16:43:29 +0800 Subject: [PATCH] monitor cluster health in cluster_controller Signed-off-by: Garrybest --- .../app/controllermanager.go | 7 +- cmd/controller-manager/app/options/options.go | 16 ++ pkg/controllers/cluster/cluster_controller.go | 203 +++++++++++++++++- 3 files changed, 223 insertions(+), 3 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 52a080a2c..ca95c69ed 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -118,8 +118,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } clusterController := &cluster.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), + ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, + ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, + ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, } if err := clusterController.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup cluster controller: %v", err) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index f3ad54258..95f6f52db 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -39,6 +39,16 @@ type Options struct { // ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that // how long the current holder of a lease has last updated the lease. ClusterLeaseRenewIntervalFraction float64 + // ClusterMonitorPeriod represents cluster-controller monitoring period, i.e. how often does + // cluster-controller check cluster health signal posted from cluster-status-controller. + // This value should be lower than ClusterMonitorGracePeriod. + ClusterMonitorPeriod metav1.Duration + // ClusterMonitorGracePeriod represents the grace period after last cluster health probe time. + // If it doesn't receive update for this amount of time, it will start posting + // "ClusterReady==ConditionUnknown". + ClusterMonitorGracePeriod metav1.Duration + // When cluster is just created, e.g. agent bootstrap or cluster join, we give a longer grace period. + ClusterStartupGracePeriod metav1.Duration } // NewOptions builds an empty options. @@ -87,4 +97,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { "Specifies the expiration period of a cluster lease.") flags.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25, "Specifies the cluster lease renew interval fraction.") + flags.DurationVar(&o.ClusterMonitorPeriod.Duration, "cluster-monitor-period", 5*time.Second, + "Specifies how often karmada-controller-manager monitors cluster health status.") + flags.DurationVar(&o.ClusterMonitorGracePeriod.Duration, "cluster-monitor-grace-period", 40*time.Second, + "Specifies the grace period of allowing a running cluster to be unresponsive before marking it unhealthy.") + flags.DurationVar(&o.ClusterStartupGracePeriod.Duration, "cluster-startup-grace-period", 60*time.Second, + "Specifies the grace period of allowing a cluster to be unresponsive during startup before marking it unhealthy.") } diff --git a/pkg/controllers/cluster/cluster_controller.go b/pkg/controllers/cluster/cluster_controller.go index 003cf1d72..e91319d43 100644 --- a/pkg/controllers/cluster/cluster_controller.go +++ b/pkg/controllers/cluster/cluster_controller.go @@ -3,11 +3,18 @@ package cluster import ( "context" "fmt" + "sync" + "time" + coordv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" @@ -22,12 +29,37 @@ import ( const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "cluster-controller" + // MonitorRetrySleepTime is the amount of time the cluster controller that should + // sleep between retrying cluster health updates. + MonitorRetrySleepTime = 20 * time.Millisecond + // HealthUpdateRetry controls the number of retries of writing cluster health update. + HealthUpdateRetry = 5 ) // Controller is to sync Cluster. type Controller struct { client.Client // used to operate Cluster resources. EventRecorder record.EventRecorder + + // ClusterMonitorPeriod represents cluster-controller monitoring period, i.e. how often does + // cluster-controller check cluster health signal posted from cluster-status-controller. + // This value should be lower than ClusterMonitorGracePeriod. + ClusterMonitorPeriod time.Duration + // ClusterMonitorGracePeriod represents the grace period after last cluster health probe time. + // If it doesn't receive update for this amount of time, it will start posting + // "ClusterReady==ConditionUnknown". + ClusterMonitorGracePeriod time.Duration + // When cluster is just created, e.g. agent bootstrap or cluster join, we give a longer grace period. + ClusterStartupGracePeriod time.Duration + + // Per Cluster map stores last observed health together with a local time when it was observed. + clusterHealthMap sync.Map +} + +type clusterHealthData struct { + probeTimestamp v1.Time + status *v1alpha1.ClusterStatus + lease *coordv1.Lease } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -53,9 +85,27 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return c.syncCluster(cluster) } +// Start starts an asynchronous loop that monitors the status of cluster. +func (c *Controller) Start(ctx context.Context) error { + klog.Infof("Starting cluster health monitor") + defer klog.Infof("Shutting cluster health monitor") + + // Incorporate the results of cluster health signal pushed from cluster-status-controller to master. + go wait.UntilWithContext(ctx, func(ctx context.Context) { + if err := c.monitorClusterHealth(); err != nil { + klog.Errorf("Error monitoring cluster health: %v", err) + } + }, c.ClusterMonitorPeriod) + + return nil +} + // SetupWithManager creates a controller and register to controller manager. func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c) + return utilerrors.NewAggregate([]error{ + controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c), + mgr.Add(c), + }) } func (c *Controller) syncCluster(cluster *v1alpha1.Cluster) (controllerruntime.Result, error) { @@ -194,3 +244,154 @@ func (c *Controller) createExecutionSpace(cluster *v1alpha1.Cluster) error { return nil } + +func (c *Controller) monitorClusterHealth() error { + clusterList := &v1alpha1.ClusterList{} + if err := c.Client.List(context.TODO(), clusterList); err != nil { + return err + } + + clusters := clusterList.Items + for i := range clusters { + cluster := &clusters[i] + if err := wait.PollImmediate(MonitorRetrySleepTime, MonitorRetrySleepTime*HealthUpdateRetry, func() (bool, error) { + // Cluster object may be changed in this function. + if err := c.tryUpdateClusterHealth(cluster); err == nil { + return true, nil + } + clusterName := cluster.Name + if err := c.Get(context.TODO(), client.ObjectKey{Name: clusterName}, cluster); err != nil { + // If the cluster does not exist any more, we delete the health data from the map. + if apierrors.IsNotFound(err) { + c.clusterHealthMap.Delete(clusterName) + return true, nil + } + klog.Errorf("Getting a cluster to retry updating cluster health error: %v", clusterName, err) + return false, err + } + return false, nil + }); err != nil { + klog.Errorf("Update health of Cluster '%v' from Controller error: %v. Skipping.", cluster.Name, err) + continue + } + } + + return nil +} + +// tryUpdateClusterHealth checks a given cluster's conditions and tries to update it. +//nolint:gocyclo +func (c *Controller) tryUpdateClusterHealth(cluster *v1alpha1.Cluster) error { + // Step 1: Get the last cluster heath from `clusterHealthMap`. + var clusterHealth *clusterHealthData + if value, exists := c.clusterHealthMap.Load(cluster.Name); exists { + clusterHealth = value.(*clusterHealthData) + } + + defer func() { + c.clusterHealthMap.Store(cluster.Name, clusterHealth) + }() + + // Step 2: Get the cluster ready condition. + var gracePeriod time.Duration + var observedReadyCondition *v1.Condition + currentReadyCondition := meta.FindStatusCondition(cluster.Status.Conditions, v1alpha1.ClusterConditionReady) + if currentReadyCondition == nil { + // If ready condition is nil, then cluster-status-controller has never posted cluster status. + // A fake ready condition is created, where LastTransitionTime is set to cluster.CreationTimestamp + // to avoid handle the corner case. + observedReadyCondition = &v1.Condition{ + Type: v1alpha1.ClusterConditionReady, + Status: v1.ConditionUnknown, + LastTransitionTime: cluster.CreationTimestamp, + } + gracePeriod = c.ClusterStartupGracePeriod + if clusterHealth != nil { + clusterHealth.status = &cluster.Status + } else { + clusterHealth = &clusterHealthData{ + status: &cluster.Status, + probeTimestamp: cluster.CreationTimestamp, + } + } + } else { + // If ready condition is not nil, make a copy of it, since we may modify it in place later. + observedReadyCondition = currentReadyCondition.DeepCopy() + gracePeriod = c.ClusterMonitorGracePeriod + } + + // Step 3: Get the last condition and lease from `clusterHealth`. + var savedCondition *v1.Condition + var savedLease *coordv1.Lease + if clusterHealth != nil { + savedCondition = meta.FindStatusCondition(clusterHealth.status.Conditions, v1alpha1.ClusterConditionReady) + savedLease = clusterHealth.lease + } + + // Step 4: Update the clusterHealth if necessary. + // If this condition have no difference from last condition, we leave everything as it is. + // Otherwise, we only update the probeTimestamp. + if clusterHealth == nil || !equality.Semantic.DeepEqual(savedCondition, currentReadyCondition) { + clusterHealth = &clusterHealthData{ + status: cluster.Status.DeepCopy(), + probeTimestamp: v1.Now(), + } + } + // Always update the probe time if cluster lease is renewed. + // Note: If cluster-status-controller never posted the cluster status, but continues renewing the + // heartbeat leases, the cluster controller will assume the cluster is healthy and take no action. + observedLease := &coordv1.Lease{} + err := c.Client.Get(context.TODO(), client.ObjectKey{Namespace: util.NamespaceClusterLease, Name: cluster.Name}, observedLease) + if err == nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) { + clusterHealth.lease = observedLease + clusterHealth.probeTimestamp = v1.Now() + } + + // Step 5: Check whether the probe timestamp has timed out. + if v1.Now().After(clusterHealth.probeTimestamp.Add(gracePeriod)) { + clusterConditionTypes := []string{ + v1alpha1.ClusterConditionReady, + } + + nowTimestamp := v1.Now() + for _, clusterConditionType := range clusterConditionTypes { + currentCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterConditionType) + if currentCondition == nil { + klog.V(2).Infof("Condition %v of cluster %v was never updated by cluster-status-controller", + clusterConditionType, cluster.Name) + cluster.Status.Conditions = append(cluster.Status.Conditions, v1.Condition{ + Type: clusterConditionType, + Status: v1.ConditionUnknown, + Reason: "ClusterStatusNeverUpdated", + Message: "Cluster status controller never posted cluster status.", + LastTransitionTime: nowTimestamp, + }) + } else { + klog.V(2).Infof("cluster %v hasn't been updated for %+v. Last %v is: %+v", + cluster.Name, v1.Now().Time.Sub(clusterHealth.probeTimestamp.Time), clusterConditionType, currentCondition) + if currentCondition.Status != v1.ConditionUnknown { + currentCondition.Status = v1.ConditionUnknown + currentCondition.Reason = "ClusterStatusUnknown" + currentCondition.Message = "Cluster status controller stopped posting cluster status." + currentCondition.LastTransitionTime = nowTimestamp + } + } + } + // We need to update currentReadyCondition due to its value potentially changed. + currentReadyCondition = meta.FindStatusCondition(cluster.Status.Conditions, v1alpha1.ClusterConditionReady) + + if !equality.Semantic.DeepEqual(currentReadyCondition, observedReadyCondition) { + if err := c.Status().Update(context.TODO(), cluster); err != nil { + klog.Errorf("Error updating cluster %s: %v", cluster.Name, err) + return err + } + clusterHealth = &clusterHealthData{ + status: &cluster.Status, + probeTimestamp: clusterHealth.probeTimestamp, + lease: observedLease, + } + return nil + } + } + return nil +}