From ff94289352a3943d6ce92b6679de69011fdfd080 Mon Sep 17 00:00:00 2001 From: Garrybest Date: Thu, 18 Aug 2022 21:21:54 +0800 Subject: [PATCH] remove Failover from scheduler Signed-off-by: Garrybest --- artifacts/deploy/karmada-scheduler.yaml | 1 - pkg/scheduler/event_handler.go | 58 -------------------- pkg/scheduler/metrics/metrics.go | 2 - pkg/scheduler/scheduler.go | 71 +++++++------------------ test/e2e/failover_test.go | 12 +++-- test/e2e/framework/constant.go | 2 +- 6 files changed, 30 insertions(+), 116 deletions(-) diff --git a/artifacts/deploy/karmada-scheduler.yaml b/artifacts/deploy/karmada-scheduler.yaml index ca2e81d67..ea67344e8 100644 --- a/artifacts/deploy/karmada-scheduler.yaml +++ b/artifacts/deploy/karmada-scheduler.yaml @@ -37,7 +37,6 @@ spec: - --kubeconfig=/etc/kubeconfig - --bind-address=0.0.0.0 - --secure-port=10351 - - --feature-gates=Failover=true - --enable-scheduler-estimator=true - --v=4 volumeMounts: diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index 496fb7eea..1e10ef5b0 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -4,7 +4,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" @@ -13,7 +12,6 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" - "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/gclient" @@ -195,48 +193,6 @@ func (s *Scheduler) addCluster(obj interface{}) { } } -// enqueueAffectedBinding will find all ResourceBindings which are related to the current NotReady cluster and add them in queue. -func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) { - bindings, _ := s.bindingLister.List(labels.Everything()) - klog.Infof("Start traveling all ResourceBindings") - for _, binding := range bindings { - clusters := binding.Spec.Clusters - for _, bindingCluster := range clusters { - if bindingCluster.Name == notReadyClusterName { - rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding) - if err != nil { - klog.Errorf("couldn't get rescheduleKey for ResourceBinding %#v: %v", bindingCluster.Name, err) - return - } - s.queue.Add(rescheduleKey) - metrics.CountSchedulerBindings(metrics.ClusterNotReady) - klog.Infof("Add expired ResourceBinding in queue successfully") - } - } - } -} - -// enqueueAffectedClusterBinding will find all cluster resource bindings which are related to the current NotReady cluster and add them in queue. -func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) { - bindings, _ := s.clusterBindingLister.List(labels.Everything()) - klog.Infof("Start traveling all ClusterResourceBindings") - for _, binding := range bindings { - clusters := binding.Spec.Clusters - for _, bindingCluster := range clusters { - if bindingCluster.Name == notReadyClusterName { - rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding) - if err != nil { - klog.Errorf("couldn't get rescheduleKey for ClusterResourceBinding %s: %v", bindingCluster.Name, err) - return - } - s.queue.Add(rescheduleKey) - metrics.CountSchedulerBindings(metrics.ClusterNotReady) - klog.Infof("Add expired ClusterResourceBinding in queue successfully") - } - } - } -} - func (s *Scheduler) updateCluster(_, newObj interface{}) { newCluster, ok := newObj.(*clusterv1alpha1.Cluster) if !ok { @@ -248,17 +204,6 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) { if s.enableSchedulerEstimator { s.schedulerEstimatorWorker.Add(newCluster.Name) } - - // Check if cluster becomes failure - if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { - klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, features.FeatureGate.Enabled(features.Failover)) - - if features.FeatureGate.Enabled(features.Failover) { // Trigger reschedule on cluster failure only when flag is true. - s.enqueueAffectedBinding(newCluster.Name) - s.enqueueAffectedClusterBinding(newCluster.Name) - return - } - } } func (s *Scheduler) deleteCluster(obj interface{}) { @@ -280,9 +225,6 @@ func (s *Scheduler) deleteCluster(obj interface{}) { klog.V(3).Infof("Delete event for cluster %s", cluster.Name) - s.enqueueAffectedBinding(cluster.Name) - s.enqueueAffectedClusterBinding(cluster.Name) - if s.enableSchedulerEstimator { s.schedulerEstimatorWorker.Add(cluster.Name) } diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 6bc311f06..bfdcef6bd 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -26,8 +26,6 @@ const ( ScheduleAttemptFailure = "ScheduleAttemptFailure" // PolicyChanged means binding needs to be rescheduled for the policy changed PolicyChanged = "PolicyChanged" - // ClusterNotReady means binding needs to be rescheduled for cluster is not ready - ClusterNotReady = "ClusterNotReady" ) const ( diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index cfcb6bbfb..e8247a40f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -25,7 +25,6 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" - "github.com/karmada-io/karmada/pkg/features" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" @@ -50,9 +49,6 @@ const ( // ScaleSchedule means the replicas of binding object has been changed. ScaleSchedule ScheduleType = "ScaleSchedule" - - // FailoverSchedule means one of the cluster a binding object associated with becomes failure. - FailoverSchedule ScheduleType = "FailoverSchedule" ) const ( @@ -367,8 +363,7 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) { if err != nil { return err } - appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation) - if policyPlacementStr != appliedPlacement { + if appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation); policyPlacementStr != appliedPlacement { // policy placement changed, need schedule klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name) err = s.scheduleResourceBinding(rb) @@ -382,18 +377,18 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) { metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err) return err } - // TODO(dddddai): reschedule bindings on cluster change - if s.allClustersInReadyState(rb.Spec.Clusters) { - klog.Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name) - return nil - } - - if features.FeatureGate.Enabled(features.Failover) { - klog.Infof("Reschedule ResourceBinding(%s/%s) as cluster failure or deletion", namespace, name) + if rb.Spec.Replicas == 0 || + policyPlacement.ReplicaScheduling == nil || + policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + // Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated + // even if scheduling type is divided. + klog.V(3).Infof("Start to schedule ResourceBinding(%s/%s) as scheduling type is duplicated", namespace, name) err = s.scheduleResourceBinding(rb) - metrics.BindingSchedule(string(FailoverSchedule), utilmetrics.DurationInSeconds(start), err) + metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err) return err } + // TODO(dddddai): reschedule bindings on cluster change + klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name) return nil } @@ -430,8 +425,7 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) { if err != nil { return err } - appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation) - if policyPlacementStr != appliedPlacement { + if appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation); policyPlacementStr != appliedPlacement { // policy placement changed, need schedule klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name) err = s.scheduleClusterResourceBinding(crb) @@ -445,17 +439,18 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) { metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err) return err } - // TODO(dddddai): reschedule bindings on cluster change - if s.allClustersInReadyState(crb.Spec.Clusters) { - klog.Infof("Don't need to schedule ClusterResourceBinding(%s)", name) - return nil - } - if features.FeatureGate.Enabled(features.Failover) { - klog.Infof("Reschedule ClusterResourceBinding(%s) as cluster failure or deletion", name) + if crb.Spec.Replicas == 0 || + policyPlacement.ReplicaScheduling == nil || + policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + // Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated + // even if scheduling type is divided. + klog.V(3).Infof("Start to schedule ClusterResourceBinding(%s) as scheduling type is duplicated", name) err = s.scheduleClusterResourceBinding(crb) - metrics.BindingSchedule(string(FailoverSchedule), utilmetrics.DurationInSeconds(start), err) + metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err) return err } + // TODO(dddddai): reschedule bindings on cluster change + klog.Infof("Don't need to schedule ClusterResourceBinding(%s)", name) return nil } @@ -564,32 +559,6 @@ func (s *Scheduler) handleErr(err error, key interface{}) { metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure) } -func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool { - clusters := s.schedulerCache.Snapshot().GetClusters() - for i := range tcs { - isNoExisted := true - for _, c := range clusters { - if c.Cluster().Name != tcs[i].Name { - continue - } - - isNoExisted = false - if meta.IsStatusConditionFalse(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady) || - !c.Cluster().DeletionTimestamp.IsZero() { - return false - } - - break - } - - if isNoExisted { - // don't find the target cluster in snapshot because it might have been deleted - return false - } - } - return true -} - func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error { name, ok := key.(string) if !ok { diff --git a/test/e2e/failover_test.go b/test/e2e/failover_test.go index 8091c7101..b8dc92709 100644 --- a/test/e2e/failover_test.go +++ b/test/e2e/failover_test.go @@ -7,8 +7,8 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" @@ -17,7 +17,9 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + controllercluster "github.com/karmada-io/karmada/pkg/controllers/cluster" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/test/e2e/framework" testhelper "github.com/karmada-io/karmada/test/helper" ) @@ -56,6 +58,10 @@ var _ = framework.SerialDescribe("failover testing", func() { MatchLabels: pushModeClusterLabels, }, }, + ClusterTolerations: []corev1.Toleration{ + *helper.NewNotReadyToleration(2), + *helper.NewUnreachableToleration(2), + }, SpreadConstraints: []policyv1alpha1.SpreadConstraint{ { SpreadByField: policyv1alpha1.SpreadByFieldCluster, @@ -89,7 +95,7 @@ var _ = framework.SerialDescribe("failover testing", func() { // wait for the current cluster status changing to false framework.WaitClusterFitWith(controlPlaneClient, targetClusterName, func(cluster *clusterv1alpha1.Cluster) bool { - return meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) + return helper.TaintExists(cluster.Spec.Taints, controllercluster.NotReadyTaintTemplate) }) disabledClusters = append(disabledClusters, targetClusterName) temp-- @@ -127,7 +133,7 @@ var _ = framework.SerialDescribe("failover testing", func() { if err != nil { return false, err } - if meta.IsStatusConditionPresentAndEqual(currentCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) { + if !helper.TaintExists(currentCluster.Spec.Taints, controllercluster.NotReadyTaintTemplate) { fmt.Printf("cluster %s recovered\n", disabledCluster) return true, nil } diff --git a/test/e2e/framework/constant.go b/test/e2e/framework/constant.go index b39295fa1..99f2aeadd 100644 --- a/test/e2e/framework/constant.go +++ b/test/e2e/framework/constant.go @@ -6,5 +6,5 @@ const ( // pollInterval defines the interval time for a poll operation. pollInterval = 5 * time.Second // pollTimeout defines the time after which the poll operation times out. - pollTimeout = 300 * time.Second + pollTimeout = 420 * time.Second )