Merge pull request #454 from Garrybest/cluster-lifecycle

monitor cluster health in cluster_controller
This commit is contained in:
karmada-bot 2021-06-28 09:18:12 +08:00 committed by GitHub
commit e1db428290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 223 additions and 3 deletions

View File

@ -118,8 +118,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
} }
clusterController := &cluster.Controller{ clusterController := &cluster.Controller{
Client: mgr.GetClient(), Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName),
ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration,
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration,
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration,
} }
if err := clusterController.SetupWithManager(mgr); err != nil { if err := clusterController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster controller: %v", err) klog.Fatalf("Failed to setup cluster controller: %v", err)

View File

@ -39,6 +39,16 @@ type Options struct {
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that // ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease. // how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64 ClusterLeaseRenewIntervalFraction float64
// ClusterMonitorPeriod represents cluster-controller monitoring period, i.e. how often does
// cluster-controller check cluster health signal posted from cluster-status-controller.
// This value should be lower than ClusterMonitorGracePeriod.
ClusterMonitorPeriod metav1.Duration
// ClusterMonitorGracePeriod represents the grace period after last cluster health probe time.
// If it doesn't receive update for this amount of time, it will start posting
// "ClusterReady==ConditionUnknown".
ClusterMonitorGracePeriod metav1.Duration
// When cluster is just created, e.g. agent bootstrap or cluster join, we give a longer grace period.
ClusterStartupGracePeriod metav1.Duration
} }
// NewOptions builds an empty options. // NewOptions builds an empty options.
@ -87,4 +97,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
"Specifies the expiration period of a cluster lease.") "Specifies the expiration period of a cluster lease.")
flags.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25, flags.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25,
"Specifies the cluster lease renew interval fraction.") "Specifies the cluster lease renew interval fraction.")
flags.DurationVar(&o.ClusterMonitorPeriod.Duration, "cluster-monitor-period", 5*time.Second,
"Specifies how often karmada-controller-manager monitors cluster health status.")
flags.DurationVar(&o.ClusterMonitorGracePeriod.Duration, "cluster-monitor-grace-period", 40*time.Second,
"Specifies the grace period of allowing a running cluster to be unresponsive before marking it unhealthy.")
flags.DurationVar(&o.ClusterStartupGracePeriod.Duration, "cluster-startup-grace-period", 60*time.Second,
"Specifies the grace period of allowing a cluster to be unresponsive during startup before marking it unhealthy.")
} }

View File

@ -3,11 +3,18 @@ package cluster
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time"
coordv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2" "k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime" controllerruntime "sigs.k8s.io/controller-runtime"
@ -22,12 +29,37 @@ import (
const ( const (
// ControllerName is the controller name that will be used when reporting events. // ControllerName is the controller name that will be used when reporting events.
ControllerName = "cluster-controller" ControllerName = "cluster-controller"
// MonitorRetrySleepTime is the amount of time the cluster controller that should
// sleep between retrying cluster health updates.
MonitorRetrySleepTime = 20 * time.Millisecond
// HealthUpdateRetry controls the number of retries of writing cluster health update.
HealthUpdateRetry = 5
) )
// Controller is to sync Cluster. // Controller is to sync Cluster.
type Controller struct { type Controller struct {
client.Client // used to operate Cluster resources. client.Client // used to operate Cluster resources.
EventRecorder record.EventRecorder EventRecorder record.EventRecorder
// ClusterMonitorPeriod represents cluster-controller monitoring period, i.e. how often does
// cluster-controller check cluster health signal posted from cluster-status-controller.
// This value should be lower than ClusterMonitorGracePeriod.
ClusterMonitorPeriod time.Duration
// ClusterMonitorGracePeriod represents the grace period after last cluster health probe time.
// If it doesn't receive update for this amount of time, it will start posting
// "ClusterReady==ConditionUnknown".
ClusterMonitorGracePeriod time.Duration
// When cluster is just created, e.g. agent bootstrap or cluster join, we give a longer grace period.
ClusterStartupGracePeriod time.Duration
// Per Cluster map stores last observed health together with a local time when it was observed.
clusterHealthMap sync.Map
}
type clusterHealthData struct {
probeTimestamp v1.Time
status *v1alpha1.ClusterStatus
lease *coordv1.Lease
} }
// Reconcile performs a full reconciliation for the object referred to by the Request. // Reconcile performs a full reconciliation for the object referred to by the Request.
@ -53,9 +85,27 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
return c.syncCluster(cluster) return c.syncCluster(cluster)
} }
// Start starts an asynchronous loop that monitors the status of cluster.
func (c *Controller) Start(ctx context.Context) error {
klog.Infof("Starting cluster health monitor")
defer klog.Infof("Shutting cluster health monitor")
// Incorporate the results of cluster health signal pushed from cluster-status-controller to master.
go wait.UntilWithContext(ctx, func(ctx context.Context) {
if err := c.monitorClusterHealth(); err != nil {
klog.Errorf("Error monitoring cluster health: %v", err)
}
}, c.ClusterMonitorPeriod)
return nil
}
// SetupWithManager creates a controller and register to controller manager. // SetupWithManager creates a controller and register to controller manager.
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c) return utilerrors.NewAggregate([]error{
controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c),
mgr.Add(c),
})
} }
func (c *Controller) syncCluster(cluster *v1alpha1.Cluster) (controllerruntime.Result, error) { func (c *Controller) syncCluster(cluster *v1alpha1.Cluster) (controllerruntime.Result, error) {
@ -194,3 +244,154 @@ func (c *Controller) createExecutionSpace(cluster *v1alpha1.Cluster) error {
return nil return nil
} }
func (c *Controller) monitorClusterHealth() error {
clusterList := &v1alpha1.ClusterList{}
if err := c.Client.List(context.TODO(), clusterList); err != nil {
return err
}
clusters := clusterList.Items
for i := range clusters {
cluster := &clusters[i]
if err := wait.PollImmediate(MonitorRetrySleepTime, MonitorRetrySleepTime*HealthUpdateRetry, func() (bool, error) {
// Cluster object may be changed in this function.
if err := c.tryUpdateClusterHealth(cluster); err == nil {
return true, nil
}
clusterName := cluster.Name
if err := c.Get(context.TODO(), client.ObjectKey{Name: clusterName}, cluster); err != nil {
// If the cluster does not exist any more, we delete the health data from the map.
if apierrors.IsNotFound(err) {
c.clusterHealthMap.Delete(clusterName)
return true, nil
}
klog.Errorf("Getting a cluster to retry updating cluster health error: %v", clusterName, err)
return false, err
}
return false, nil
}); err != nil {
klog.Errorf("Update health of Cluster '%v' from Controller error: %v. Skipping.", cluster.Name, err)
continue
}
}
return nil
}
// tryUpdateClusterHealth checks a given cluster's conditions and tries to update it.
//nolint:gocyclo
func (c *Controller) tryUpdateClusterHealth(cluster *v1alpha1.Cluster) error {
// Step 1: Get the last cluster heath from `clusterHealthMap`.
var clusterHealth *clusterHealthData
if value, exists := c.clusterHealthMap.Load(cluster.Name); exists {
clusterHealth = value.(*clusterHealthData)
}
defer func() {
c.clusterHealthMap.Store(cluster.Name, clusterHealth)
}()
// Step 2: Get the cluster ready condition.
var gracePeriod time.Duration
var observedReadyCondition *v1.Condition
currentReadyCondition := meta.FindStatusCondition(cluster.Status.Conditions, v1alpha1.ClusterConditionReady)
if currentReadyCondition == nil {
// If ready condition is nil, then cluster-status-controller has never posted cluster status.
// A fake ready condition is created, where LastTransitionTime is set to cluster.CreationTimestamp
// to avoid handle the corner case.
observedReadyCondition = &v1.Condition{
Type: v1alpha1.ClusterConditionReady,
Status: v1.ConditionUnknown,
LastTransitionTime: cluster.CreationTimestamp,
}
gracePeriod = c.ClusterStartupGracePeriod
if clusterHealth != nil {
clusterHealth.status = &cluster.Status
} else {
clusterHealth = &clusterHealthData{
status: &cluster.Status,
probeTimestamp: cluster.CreationTimestamp,
}
}
} else {
// If ready condition is not nil, make a copy of it, since we may modify it in place later.
observedReadyCondition = currentReadyCondition.DeepCopy()
gracePeriod = c.ClusterMonitorGracePeriod
}
// Step 3: Get the last condition and lease from `clusterHealth`.
var savedCondition *v1.Condition
var savedLease *coordv1.Lease
if clusterHealth != nil {
savedCondition = meta.FindStatusCondition(clusterHealth.status.Conditions, v1alpha1.ClusterConditionReady)
savedLease = clusterHealth.lease
}
// Step 4: Update the clusterHealth if necessary.
// If this condition have no difference from last condition, we leave everything as it is.
// Otherwise, we only update the probeTimestamp.
if clusterHealth == nil || !equality.Semantic.DeepEqual(savedCondition, currentReadyCondition) {
clusterHealth = &clusterHealthData{
status: cluster.Status.DeepCopy(),
probeTimestamp: v1.Now(),
}
}
// Always update the probe time if cluster lease is renewed.
// Note: If cluster-status-controller never posted the cluster status, but continues renewing the
// heartbeat leases, the cluster controller will assume the cluster is healthy and take no action.
observedLease := &coordv1.Lease{}
err := c.Client.Get(context.TODO(), client.ObjectKey{Namespace: util.NamespaceClusterLease, Name: cluster.Name}, observedLease)
if err == nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
clusterHealth.lease = observedLease
clusterHealth.probeTimestamp = v1.Now()
}
// Step 5: Check whether the probe timestamp has timed out.
if v1.Now().After(clusterHealth.probeTimestamp.Add(gracePeriod)) {
clusterConditionTypes := []string{
v1alpha1.ClusterConditionReady,
}
nowTimestamp := v1.Now()
for _, clusterConditionType := range clusterConditionTypes {
currentCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterConditionType)
if currentCondition == nil {
klog.V(2).Infof("Condition %v of cluster %v was never updated by cluster-status-controller",
clusterConditionType, cluster.Name)
cluster.Status.Conditions = append(cluster.Status.Conditions, v1.Condition{
Type: clusterConditionType,
Status: v1.ConditionUnknown,
Reason: "ClusterStatusNeverUpdated",
Message: "Cluster status controller never posted cluster status.",
LastTransitionTime: nowTimestamp,
})
} else {
klog.V(2).Infof("cluster %v hasn't been updated for %+v. Last %v is: %+v",
cluster.Name, v1.Now().Time.Sub(clusterHealth.probeTimestamp.Time), clusterConditionType, currentCondition)
if currentCondition.Status != v1.ConditionUnknown {
currentCondition.Status = v1.ConditionUnknown
currentCondition.Reason = "ClusterStatusUnknown"
currentCondition.Message = "Cluster status controller stopped posting cluster status."
currentCondition.LastTransitionTime = nowTimestamp
}
}
}
// We need to update currentReadyCondition due to its value potentially changed.
currentReadyCondition = meta.FindStatusCondition(cluster.Status.Conditions, v1alpha1.ClusterConditionReady)
if !equality.Semantic.DeepEqual(currentReadyCondition, observedReadyCondition) {
if err := c.Status().Update(context.TODO(), cluster); err != nil {
klog.Errorf("Error updating cluster %s: %v", cluster.Name, err)
return err
}
clusterHealth = &clusterHealthData{
status: &cluster.Status,
probeTimestamp: clusterHealth.probeTimestamp,
lease: observedLease,
}
return nil
}
}
return nil
}