diff --git a/pkg/scheduler/cache/snapshot.go b/pkg/scheduler/cache/snapshot.go index 07d8c6daf..7b9c3b840 100644 --- a/pkg/scheduler/cache/snapshot.go +++ b/pkg/scheduler/cache/snapshot.go @@ -52,3 +52,13 @@ func (s *Snapshot) GetReadyClusterNames() sets.String { return readyClusterNames } + +// GetCluster returns the given clusters. +func (s *Snapshot) GetCluster(clusterName string) *framework.ClusterInfo { + for _, c := range s.clusterInfoList { + if c.Cluster().Name == clusterName { + return c + } + } + return nil +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index e9df99ded..2450507e1 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -25,6 +25,7 @@ import ( type ScheduleAlgorithm interface { Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error) ScaleSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error) + FailoverSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error) } // ScheduleResult includes the clusters selected. @@ -581,3 +582,65 @@ func (g *genericScheduler) getPreUsed(clusters []*clusterv1alpha1.Cluster, preUs } return preUsedCluster, unUsedCluster } + +func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *policyv1alpha1.Placement, + spec *workv1alpha1.ResourceBindingSpec) (result ScheduleResult, err error) { + readyClusters := g.schedulerCache.Snapshot().GetReadyClusterNames() + totalClusters := util.ConvertToClusterNames(spec.Clusters) + + reservedClusters := calcReservedCluster(totalClusters, readyClusters) + availableClusters := calcAvailableCluster(totalClusters, readyClusters) + + candidateClusters := sets.NewString() + for clusterName := range availableClusters { + clusterObj := g.schedulerCache.Snapshot().GetCluster(clusterName) + if clusterObj == nil { + return result, fmt.Errorf("failed to get clusterObj by clusterName: %s", clusterName) + } + + resMap := g.scheduleFramework.RunFilterPlugins(ctx, placement, &spec.Resource, clusterObj.Cluster()) + res := resMap.Merge() + if !res.IsSuccess() { + klog.V(4).Infof("cluster %q is not fit", clusterName) + } else { + candidateClusters.Insert(clusterName) + } + } + + klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List()) + klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List()) + + // TODO: should schedule as much as possible? + deltaLen := len(spec.Clusters) - len(reservedClusters) + if len(candidateClusters) < deltaLen { + // for ReplicaSchedulingTypeDivided, we will try to migrate replicas to the other health clusters + if placement.ReplicaScheduling == nil || placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + klog.Warningf("ignore reschedule binding as insufficient available cluster") + return ScheduleResult{}, nil + } + } + + // TODO: check if the final result meets the spread constraints. + targetClusters := reservedClusters + clusterList := candidateClusters.List() + for i := 0; i < deltaLen && i < len(candidateClusters); i++ { + targetClusters.Insert(clusterList[i]) + } + + var reScheduleResult []workv1alpha1.TargetCluster + for cluster := range targetClusters { + reScheduleResult = append(reScheduleResult, workv1alpha1.TargetCluster{Name: cluster}) + } + + return ScheduleResult{reScheduleResult}, nil +} + +// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'. +func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String { + return bindClusters.Difference(bindClusters.Difference(readyClusters)) +} + +// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'. +func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String { + return readyClusters.Difference(bindCluster) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c0261501c..a6f6c8cd8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -12,13 +12,11 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - v1helper "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -595,20 +593,15 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor return err } - targetClusters, err := s.obtainTargetCluster(clusterResourceBinding.Spec.Clusters, policy.Spec.Placement) + reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec) if err != nil { return err } - if targetClusters == nil { + if len(reScheduleResult.SuggestedClusters) == 0 { return nil } - // TODO(tinyma123) Check if the final result meets the spread constraints. - - clusterResourceBinding.Spec.Clusters = nil - for cluster := range targetClusters { - clusterResourceBinding.Spec.Clusters = append(clusterResourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) - } + clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters) _, err = s.KarmadaClient.WorkV1alpha1().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{}) @@ -625,20 +618,15 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.Reso return err } - targetClusters, err := s.obtainTargetCluster(resourceBinding.Spec.Clusters, placement) + reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &placement, &resourceBinding.Spec) if err != nil { return err } - if targetClusters == nil { + if len(reScheduleResult.SuggestedClusters) == 0 { return nil } - // TODO(tinyma123) Check if the final result meets the spread constraints. - - resourceBinding.Spec.Clusters = nil - for cluster := range targetClusters { - resourceBinding.Spec.Clusters = append(resourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) - } + resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters) _, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{}) @@ -648,66 +636,6 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.Reso return nil } -// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'. -func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String { - return bindClusters.Difference(bindClusters.Difference(readyClusters)) -} - -// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'. -func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String { - return readyClusters.Difference(bindCluster) -} - -func (s *Scheduler) obtainTargetCluster(bindingClusters []workv1alpha1.TargetCluster, placement policyv1alpha1.Placement) (sets.String, error) { - readyClusters := s.schedulerCache.Snapshot().GetReadyClusterNames() - totalClusters := util.ConvertToClusterNames(bindingClusters) - - reservedClusters := calcReservedCluster(totalClusters, readyClusters) - availableClusters := calcAvailableCluster(totalClusters, readyClusters) - - filterPredicate := func(t *corev1.Taint) bool { - // now only interested in NoSchedule taint which means do not allow new resource to schedule onto the cluster unless they tolerate the taint - // todo: supprot NoExecute taint - return t.Effect == corev1.TaintEffectNoSchedule - } - - candidateClusters := sets.NewString() - for clusterName := range availableClusters { - clusterObj, err := s.clusterLister.Get(clusterName) - if err != nil { - klog.Errorf("Failed to get clusterObj by clusterName: %s", clusterName) - return nil, err - } - - if placement.ClusterAffinity != nil && !util.ClusterMatches(clusterObj, *placement.ClusterAffinity) { - continue - } - - _, isUntolerated := v1helper.FindMatchingUntoleratedTaint(clusterObj.Spec.Taints, placement.ClusterTolerations, filterPredicate) - if !isUntolerated { - candidateClusters.Insert(clusterName) - } - } - - klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List()) - klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List()) - - // TODO: should schedule as much as possible? - deltaLen := len(bindingClusters) - len(reservedClusters) - if len(candidateClusters) < deltaLen { - klog.Warningf("ignore reschedule binding as insufficient available cluster") - return nil, nil - } - - targetClusters := reservedClusters - clusterList := candidateClusters.List() - for i := 0; i < deltaLen; i++ { - targetClusters.Insert(clusterList[i]) - } - - return targetClusters, nil -} - func (s *Scheduler) scaleScheduleOne(key string) (err error) { klog.V(4).Infof("begin scale scheduling ResourceBinding %s", key) defer klog.V(4).Infof("end scale scheduling ResourceBinding %s: %v", key, err)