remove Failover from scheduler
Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
parent
822abc540b
commit
ff94289352
|
@ -37,7 +37,6 @@ spec:
|
|||
- --kubeconfig=/etc/kubeconfig
|
||||
- --bind-address=0.0.0.0
|
||||
- --secure-port=10351
|
||||
- --feature-gates=Failover=true
|
||||
- --enable-scheduler-estimator=true
|
||||
- --v=4
|
||||
volumeMounts:
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -13,7 +12,6 @@ import (
|
|||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/features"
|
||||
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/gclient"
|
||||
|
@ -195,48 +193,6 @@ func (s *Scheduler) addCluster(obj interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// enqueueAffectedBinding will find all ResourceBindings which are related to the current NotReady cluster and add them in queue.
|
||||
func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) {
|
||||
bindings, _ := s.bindingLister.List(labels.Everything())
|
||||
klog.Infof("Start traveling all ResourceBindings")
|
||||
for _, binding := range bindings {
|
||||
clusters := binding.Spec.Clusters
|
||||
for _, bindingCluster := range clusters {
|
||||
if bindingCluster.Name == notReadyClusterName {
|
||||
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
|
||||
if err != nil {
|
||||
klog.Errorf("couldn't get rescheduleKey for ResourceBinding %#v: %v", bindingCluster.Name, err)
|
||||
return
|
||||
}
|
||||
s.queue.Add(rescheduleKey)
|
||||
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
|
||||
klog.Infof("Add expired ResourceBinding in queue successfully")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueAffectedClusterBinding will find all cluster resource bindings which are related to the current NotReady cluster and add them in queue.
|
||||
func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) {
|
||||
bindings, _ := s.clusterBindingLister.List(labels.Everything())
|
||||
klog.Infof("Start traveling all ClusterResourceBindings")
|
||||
for _, binding := range bindings {
|
||||
clusters := binding.Spec.Clusters
|
||||
for _, bindingCluster := range clusters {
|
||||
if bindingCluster.Name == notReadyClusterName {
|
||||
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
|
||||
if err != nil {
|
||||
klog.Errorf("couldn't get rescheduleKey for ClusterResourceBinding %s: %v", bindingCluster.Name, err)
|
||||
return
|
||||
}
|
||||
s.queue.Add(rescheduleKey)
|
||||
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
|
||||
klog.Infof("Add expired ClusterResourceBinding in queue successfully")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) updateCluster(_, newObj interface{}) {
|
||||
newCluster, ok := newObj.(*clusterv1alpha1.Cluster)
|
||||
if !ok {
|
||||
|
@ -248,17 +204,6 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
|
|||
if s.enableSchedulerEstimator {
|
||||
s.schedulerEstimatorWorker.Add(newCluster.Name)
|
||||
}
|
||||
|
||||
// Check if cluster becomes failure
|
||||
if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
|
||||
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, features.FeatureGate.Enabled(features.Failover))
|
||||
|
||||
if features.FeatureGate.Enabled(features.Failover) { // Trigger reschedule on cluster failure only when flag is true.
|
||||
s.enqueueAffectedBinding(newCluster.Name)
|
||||
s.enqueueAffectedClusterBinding(newCluster.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) deleteCluster(obj interface{}) {
|
||||
|
@ -280,9 +225,6 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
|
|||
|
||||
klog.V(3).Infof("Delete event for cluster %s", cluster.Name)
|
||||
|
||||
s.enqueueAffectedBinding(cluster.Name)
|
||||
s.enqueueAffectedClusterBinding(cluster.Name)
|
||||
|
||||
if s.enableSchedulerEstimator {
|
||||
s.schedulerEstimatorWorker.Add(cluster.Name)
|
||||
}
|
||||
|
|
|
@ -26,8 +26,6 @@ const (
|
|||
ScheduleAttemptFailure = "ScheduleAttemptFailure"
|
||||
// PolicyChanged means binding needs to be rescheduled for the policy changed
|
||||
PolicyChanged = "PolicyChanged"
|
||||
// ClusterNotReady means binding needs to be rescheduled for cluster is not ready
|
||||
ClusterNotReady = "ClusterNotReady"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
|
||||
"github.com/karmada-io/karmada/pkg/features"
|
||||
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
|
||||
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
|
||||
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||
|
@ -50,9 +49,6 @@ const (
|
|||
|
||||
// ScaleSchedule means the replicas of binding object has been changed.
|
||||
ScaleSchedule ScheduleType = "ScaleSchedule"
|
||||
|
||||
// FailoverSchedule means one of the cluster a binding object associated with becomes failure.
|
||||
FailoverSchedule ScheduleType = "FailoverSchedule"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -367,8 +363,7 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation)
|
||||
if policyPlacementStr != appliedPlacement {
|
||||
if appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation); policyPlacementStr != appliedPlacement {
|
||||
// policy placement changed, need schedule
|
||||
klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name)
|
||||
err = s.scheduleResourceBinding(rb)
|
||||
|
@ -382,18 +377,18 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
|
|||
metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err)
|
||||
return err
|
||||
}
|
||||
// TODO(dddddai): reschedule bindings on cluster change
|
||||
if s.allClustersInReadyState(rb.Spec.Clusters) {
|
||||
klog.Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if features.FeatureGate.Enabled(features.Failover) {
|
||||
klog.Infof("Reschedule ResourceBinding(%s/%s) as cluster failure or deletion", namespace, name)
|
||||
if rb.Spec.Replicas == 0 ||
|
||||
policyPlacement.ReplicaScheduling == nil ||
|
||||
policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
|
||||
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
|
||||
// even if scheduling type is divided.
|
||||
klog.V(3).Infof("Start to schedule ResourceBinding(%s/%s) as scheduling type is duplicated", namespace, name)
|
||||
err = s.scheduleResourceBinding(rb)
|
||||
metrics.BindingSchedule(string(FailoverSchedule), utilmetrics.DurationInSeconds(start), err)
|
||||
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
|
||||
return err
|
||||
}
|
||||
// TODO(dddddai): reschedule bindings on cluster change
|
||||
klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -430,8 +425,7 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation)
|
||||
if policyPlacementStr != appliedPlacement {
|
||||
if appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation); policyPlacementStr != appliedPlacement {
|
||||
// policy placement changed, need schedule
|
||||
klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name)
|
||||
err = s.scheduleClusterResourceBinding(crb)
|
||||
|
@ -445,17 +439,18 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
|
|||
metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err)
|
||||
return err
|
||||
}
|
||||
// TODO(dddddai): reschedule bindings on cluster change
|
||||
if s.allClustersInReadyState(crb.Spec.Clusters) {
|
||||
klog.Infof("Don't need to schedule ClusterResourceBinding(%s)", name)
|
||||
return nil
|
||||
}
|
||||
if features.FeatureGate.Enabled(features.Failover) {
|
||||
klog.Infof("Reschedule ClusterResourceBinding(%s) as cluster failure or deletion", name)
|
||||
if crb.Spec.Replicas == 0 ||
|
||||
policyPlacement.ReplicaScheduling == nil ||
|
||||
policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
|
||||
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
|
||||
// even if scheduling type is divided.
|
||||
klog.V(3).Infof("Start to schedule ClusterResourceBinding(%s) as scheduling type is duplicated", name)
|
||||
err = s.scheduleClusterResourceBinding(crb)
|
||||
metrics.BindingSchedule(string(FailoverSchedule), utilmetrics.DurationInSeconds(start), err)
|
||||
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
|
||||
return err
|
||||
}
|
||||
// TODO(dddddai): reschedule bindings on cluster change
|
||||
klog.Infof("Don't need to schedule ClusterResourceBinding(%s)", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -564,32 +559,6 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
|
|||
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
|
||||
}
|
||||
|
||||
func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool {
|
||||
clusters := s.schedulerCache.Snapshot().GetClusters()
|
||||
for i := range tcs {
|
||||
isNoExisted := true
|
||||
for _, c := range clusters {
|
||||
if c.Cluster().Name != tcs[i].Name {
|
||||
continue
|
||||
}
|
||||
|
||||
isNoExisted = false
|
||||
if meta.IsStatusConditionFalse(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady) ||
|
||||
!c.Cluster().DeletionTimestamp.IsZero() {
|
||||
return false
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if isNoExisted {
|
||||
// don't find the target cluster in snapshot because it might have been deleted
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
|
||||
name, ok := key.(string)
|
||||
if !ok {
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
@ -17,7 +17,9 @@ import (
|
|||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
controllercluster "github.com/karmada-io/karmada/pkg/controllers/cluster"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/test/e2e/framework"
|
||||
testhelper "github.com/karmada-io/karmada/test/helper"
|
||||
)
|
||||
|
@ -56,6 +58,10 @@ var _ = framework.SerialDescribe("failover testing", func() {
|
|||
MatchLabels: pushModeClusterLabels,
|
||||
},
|
||||
},
|
||||
ClusterTolerations: []corev1.Toleration{
|
||||
*helper.NewNotReadyToleration(2),
|
||||
*helper.NewUnreachableToleration(2),
|
||||
},
|
||||
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
|
||||
{
|
||||
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
|
||||
|
@ -89,7 +95,7 @@ var _ = framework.SerialDescribe("failover testing", func() {
|
|||
|
||||
// wait for the current cluster status changing to false
|
||||
framework.WaitClusterFitWith(controlPlaneClient, targetClusterName, func(cluster *clusterv1alpha1.Cluster) bool {
|
||||
return meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)
|
||||
return helper.TaintExists(cluster.Spec.Taints, controllercluster.NotReadyTaintTemplate)
|
||||
})
|
||||
disabledClusters = append(disabledClusters, targetClusterName)
|
||||
temp--
|
||||
|
@ -127,7 +133,7 @@ var _ = framework.SerialDescribe("failover testing", func() {
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if meta.IsStatusConditionPresentAndEqual(currentCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) {
|
||||
if !helper.TaintExists(currentCluster.Spec.Taints, controllercluster.NotReadyTaintTemplate) {
|
||||
fmt.Printf("cluster %s recovered\n", disabledCluster)
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -6,5 +6,5 @@ const (
|
|||
// pollInterval defines the interval time for a poll operation.
|
||||
pollInterval = 5 * time.Second
|
||||
// pollTimeout defines the time after which the poll operation times out.
|
||||
pollTimeout = 300 * time.Second
|
||||
pollTimeout = 420 * time.Second
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue