add cluster failure threshold

Signed-off-by: dddddai <dddwq@foxmail.com>
This commit is contained in:
dddddai 2022-05-20 10:55:31 +08:00
parent 2974563c6a
commit 339f431673
8 changed files with 310 additions and 60 deletions

View File

@ -186,6 +186,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
@ -223,6 +224,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,
ClusterFailureThreshold: ctx.Opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}

View File

@ -38,6 +38,8 @@ type Options struct {
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64
// ClusterFailureThreshold is the duration of failure for the cluster to be considered unhealthy.
ClusterFailureThreshold metav1.Duration
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
@ -98,6 +100,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) {
"Specifies the expiration period of a cluster lease.")
fs.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25,
"Specifies the cluster lease renew interval fraction.")
fs.DurationVar(&o.ClusterFailureThreshold.Duration, "cluster-failure-threshold", 30*time.Second, "The duration of failure for the cluster to be considered unhealthy.")
fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")

View File

@ -233,6 +233,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
@ -491,6 +492,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,

View File

@ -44,6 +44,8 @@ type Options struct {
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64
// ClusterFailureThreshold is the duration of failure for the cluster to be considered unhealthy.
ClusterFailureThreshold metav1.Duration
// ClusterMonitorPeriod represents cluster-controller monitoring period, i.e. how often does
// cluster-controller check cluster health signal posted from cluster-status-controller.
// This value should be lower than ClusterMonitorGracePeriod.
@ -132,6 +134,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
"Specifies the expiration period of a cluster lease.")
flags.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25,
"Specifies the cluster lease renew interval fraction.")
flags.DurationVar(&o.ClusterFailureThreshold.Duration, "cluster-failure-threshold", 30*time.Second, "The duration of failure for the cluster to be considered unhealthy.")
flags.DurationVar(&o.ClusterMonitorPeriod.Duration, "cluster-monitor-period", 5*time.Second,
"Specifies how often karmada-controller-manager monitors cluster health status.")
flags.DurationVar(&o.ClusterMonitorGracePeriod.Duration, "cluster-monitor-grace-period", 40*time.Second,

View File

@ -38,6 +38,8 @@ type Options struct {
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64
// ClusterFailureThreshold is the duration of failure for the cluster to be considered unhealthy.
ClusterFailureThreshold metav1.Duration
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
ClusterCacheSyncTimeout metav1.Duration
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.

View File

@ -0,0 +1,73 @@
package status
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
type clusterData struct {
// readyCondition is the last observed ready condition of the cluster.
readyCondition metav1.ConditionStatus
// thresholdStartTime is the time that the ready condition changed.
thresholdStartTime time.Time
}
type clusterConditionStore struct {
clusterDataMap sync.Map
// failureThreshold is the duration of failure for the cluster to be considered unhealthy.
failureThreshold time.Duration
}
func (c *clusterConditionStore) thresholdAdjustedReadyCondition(cluster *clusterv1alpha1.Cluster, observedReadyCondition *metav1.Condition) *metav1.Condition {
saved := c.get(cluster.Name)
if saved == nil {
// the cluster is just joined
c.update(cluster.Name, &clusterData{
readyCondition: observedReadyCondition.Status,
})
return observedReadyCondition
}
curReadyCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady)
if curReadyCondition == nil {
return observedReadyCondition
}
now := time.Now()
if saved.readyCondition != observedReadyCondition.Status {
// ready condition status changed, record the threshold start time
saved = &clusterData{
readyCondition: observedReadyCondition.Status,
thresholdStartTime: now,
}
c.update(cluster.Name, saved)
}
if observedReadyCondition.Status != metav1.ConditionTrue &&
curReadyCondition.Status == metav1.ConditionTrue &&
now.Before(saved.thresholdStartTime.Add(c.failureThreshold)) {
// retain old status until threshold exceeded to avoid network unstable problems.
return curReadyCondition
}
return observedReadyCondition
}
func (c *clusterConditionStore) get(cluster string) *clusterData {
condition, ok := c.clusterDataMap.Load(cluster)
if !ok {
return nil
}
return condition.(*clusterData)
}
func (c *clusterConditionStore) update(cluster string, data *clusterData) {
// ready condition status changed, record the threshold start time
c.clusterDataMap.Store(cluster, data)
}
func (c *clusterConditionStore) delete(cluster string) {
c.clusterDataMap.Delete(cluster)
}

View File

@ -0,0 +1,168 @@
package status
import (
"testing"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
func TestThresholdAdjustedReadyCondition(t *testing.T) {
clusterFailureThreshold := 30 * time.Second
tests := []struct {
name string
clusterData *clusterData
currentCondition *metav1.Condition
observedCondition *metav1.Condition
expectedCondition *metav1.Condition
}{
{
name: "cluster just joined in ready state",
clusterData: nil, // no cache yet
currentCondition: nil, // no condition was set on cluster object yet
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
{
name: "cluster just joined in not-ready state",
clusterData: nil, // no cache yet
currentCondition: nil, // no condition was set on cluster object yet
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
},
{
name: "cluster stays ready",
clusterData: &clusterData{
readyCondition: metav1.ConditionTrue,
},
currentCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
{
name: "cluster becomes not ready but still not reach threshold",
clusterData: &clusterData{
readyCondition: metav1.ConditionFalse,
thresholdStartTime: time.Now().Add(-clusterFailureThreshold / 2),
},
currentCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
{
name: "cluster becomes not ready and reaches threshold",
clusterData: &clusterData{
readyCondition: metav1.ConditionFalse,
thresholdStartTime: time.Now().Add(-clusterFailureThreshold),
},
currentCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
},
{
name: "cluster stays not ready",
clusterData: &clusterData{
readyCondition: metav1.ConditionFalse,
thresholdStartTime: time.Now().Add(-2 * clusterFailureThreshold),
},
currentCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
},
{
name: "cluster recovers",
clusterData: &clusterData{
readyCondition: metav1.ConditionFalse,
thresholdStartTime: time.Now().Add(-3 * clusterFailureThreshold),
},
currentCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionFalse,
},
observedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
expectedCondition: &metav1.Condition{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := clusterConditionStore{
failureThreshold: clusterFailureThreshold,
}
if tt.clusterData != nil {
cache.update("member", tt.clusterData)
}
cluster := &clusterv1alpha1.Cluster{}
cluster.Name = "member"
if tt.currentCondition != nil {
meta.SetStatusCondition(&cluster.Status.Conditions, *tt.currentCondition)
}
thresholdReadyCondition := cache.thresholdAdjustedReadyCondition(cluster, tt.observedCondition)
if tt.expectedCondition.Status != thresholdReadyCondition.Status {
t.Fatalf("expected: %s, but got: %s", tt.expectedCondition.Status, thresholdReadyCondition.Status)
}
})
}
}

View File

@ -18,7 +18,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
@ -48,10 +47,6 @@ const (
clusterNotReachableReason = "ClusterNotReachable"
clusterNotReachableMsg = "cluster is not reachable"
statusCollectionFailed = "StatusCollectionFailed"
// clusterStatusRetryInterval specifies the interval between two retries.
clusterStatusRetryInterval = 500 * time.Millisecond
// clusterStatusRetryTimeout specifies the maximum time to wait for cluster status.
clusterStatusRetryTimeout = 2 * time.Second
)
var (
@ -82,6 +77,10 @@ type ClusterStatusController struct {
ClusterLeaseRenewIntervalFraction float64
// ClusterLeaseControllers store clusters and their corresponding lease controllers.
ClusterLeaseControllers sync.Map
// ClusterFailureThreshold is the duration of failure for the cluster to be considered unhealthy.
ClusterFailureThreshold metav1.Duration
// clusterConditionCache stores the condition status of each cluster.
clusterConditionCache clusterConditionStore
ClusterCacheSyncTimeout metav1.Duration
RateLimiterOptions ratelimiterflag.Options
@ -98,6 +97,7 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
// The resource may no longer exist, in which case we stop the informer.
if apierrors.IsNotFound(err) {
c.InformerManager.Stop(req.NamespacedName.Name)
c.clusterConditionCache.delete(req.Name)
return controllerruntime.Result{}, nil
}
@ -116,13 +116,16 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
c.clusterConditionCache = clusterConditionStore{
failureThreshold: c.ClusterFailureThreshold.Duration,
}
return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
}).Complete(c)
}
func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
var currentClusterStatus = clusterv1alpha1.ClusterStatus{}
currentClusterStatus := *cluster.Status.DeepCopy()
// create a ClusterClient for the given member cluster
clusterClient, err := c.ClusterClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
@ -131,66 +134,60 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to create a ClusterClient: %v", err))
}
var online, healthy bool
// in case of cluster offline, retry a few times to avoid network unstable problems.
// Note: retry timeout should not be too long, otherwise will block other cluster reconcile.
err = wait.PollImmediate(clusterStatusRetryInterval, clusterStatusRetryTimeout, func() (done bool, err error) {
online, healthy = getClusterHealthStatus(clusterClient)
if !online {
klog.V(2).Infof("Cluster(%s) is offline.", cluster.Name)
return false, nil
}
return true, nil
})
// error indicates that retry timeout, update cluster status immediately and return.
if err != nil {
klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name)
online, healthy := getClusterHealthStatus(clusterClient)
observedReadyCondition := generateReadyCondition(online, healthy)
readyCondition := c.clusterConditionCache.thresholdAdjustedReadyCondition(cluster, &observedReadyCondition)
// cluster is offline after retry timeout, update cluster status immediately and return.
if !online && readyCondition.Status != metav1.ConditionTrue {
klog.V(2).Infof("Cluster(%s) still offline after %s, ensuring offline is set.",
cluster.Name, c.ClusterFailureThreshold.Duration)
c.InformerManager.Stop(cluster.Name)
readyCondition := generateReadyCondition(false, false)
setTransitionTime(cluster.Status.Conditions, &readyCondition)
meta.SetStatusCondition(&currentClusterStatus.Conditions, readyCondition)
setTransitionTime(cluster.Status.Conditions, readyCondition)
meta.SetStatusCondition(&currentClusterStatus.Conditions, *readyCondition)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}
// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to get or create informer: %v", err))
// skip collecting cluster status if not ready
if online && healthy {
// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
}
// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)
clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get Kubernetes version for Cluster %s. Error: %v.", cluster.GetName(), err)
}
// get the list of APIs installed in the member cluster
apiEnables, err := getAPIEnablements(clusterClient)
if err != nil {
klog.Errorf("Failed to get APIs installed in Cluster %s. Error: %v.", cluster.GetName(), err)
}
nodes, err := listNodes(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list nodes for Cluster %s. Error: %v.", cluster.GetName(), err)
}
pods, err := listPods(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list pods for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.KubernetesVersion = clusterVersion
currentClusterStatus.APIEnablements = apiEnables
currentClusterStatus.NodeSummary = getNodeSummary(nodes)
currentClusterStatus.ResourceSummary = getResourceSummary(nodes, pods)
}
// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)
clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to get kubernetes version: %v", err))
}
// get the list of APIs installed in the member cluster
apiEnables, err := getAPIEnablements(clusterClient)
if err != nil {
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to get the list of APIs installed in the member cluster: %v", err))
}
nodes, err := listNodes(clusterInformerManager)
if err != nil {
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to list nodes: %v", err))
}
pods, err := listPods(clusterInformerManager)
if err != nil {
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to list pods: %v", err))
}
currentClusterStatus.KubernetesVersion = clusterVersion
currentClusterStatus.APIEnablements = apiEnables
currentClusterStatus.NodeSummary = getNodeSummary(nodes)
currentClusterStatus.ResourceSummary = getResourceSummary(nodes, pods)
readyCondition := generateReadyCondition(online, healthy)
setTransitionTime(cluster.Status.Conditions, &readyCondition)
meta.SetStatusCondition(&currentClusterStatus.Conditions, readyCondition)
setTransitionTime(currentClusterStatus.Conditions, readyCondition)
meta.SetStatusCondition(&currentClusterStatus.Conditions, *readyCondition)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}