diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 65e01adc6..5133d112b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -3,14 +3,17 @@ package scheduler import ( "context" "encoding/json" + "fmt" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -23,6 +26,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" policylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1" worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha1" schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache" @@ -40,6 +44,26 @@ const ( maxRetries = 15 ) +// ScheduleType defines the schedule type of a binding object should be performed. +type ScheduleType string + +const ( + // FirstSchedule means the binding object hasn't been scheduled. + FirstSchedule ScheduleType = "FirstSchedule" + + // ReconcileSchedule means the binding object associated policy has been changed. + ReconcileSchedule ScheduleType = "ReconcileSchedule" + + // FailoverSchedule means one of the cluster a binding object associated with becomes failure. + FailoverSchedule ScheduleType = "FailoverSchedule" + + // AvoidSchedule means don't need to trigger scheduler. + AvoidSchedule ScheduleType = "AvoidSchedule" + + // Unknown means can't detect the schedule type + Unknown ScheduleType = "Unknown" +) + // Failover indicates if the scheduler should performs re-scheduler in case of cluster failure. // TODO(RainbowMango): Remove the temporary solution by introducing feature flag var Failover bool @@ -57,6 +81,7 @@ type Scheduler struct { clusterBindingLister worklister.ClusterResourceBindingLister clusterPolicyInformer cache.SharedIndexInformer clusterPolicyLister policylister.ClusterPropagationPolicyLister + clusterLister clusterlister.ClusterLister informerFactory informerfactory.SharedInformerFactory // TODO: implement a priority scheduling queue @@ -77,6 +102,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse clusterBindingLister := factory.Work().V1alpha1().ClusterResourceBindings().Lister() clusterPolicyInformer := factory.Policy().V1alpha1().ClusterPropagationPolicies().Informer() clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister() + clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) schedulerCache := schedulercache.NewCache() // TODO: make plugins as a flag @@ -93,6 +119,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse clusterBindingLister: clusterBindingLister, clusterPolicyInformer: clusterPolicyInformer, clusterPolicyLister: clusterPolicyLister, + clusterLister: clusterLister, informerFactory: factory, queue: queue, Algorithm: algorithm, @@ -222,6 +249,62 @@ func (s *Scheduler) worker() { } } +func (s *Scheduler) getScheduleType(key string) ScheduleType { + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return Unknown + } + + // ResourceBinding object + if len(ns) > 0 { + resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name) + if errors.IsNotFound(err) { + return Unknown + } + + if len(resourceBinding.Spec.Clusters) == 0 { + return FirstSchedule + } + + policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel) + policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel) + + policy, err := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName) + if err != nil { + return Unknown + } + placement, err := json.Marshal(policy.Spec.Placement) + if err != nil { + klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policy.Namespace, policy.Name, err) + return Unknown + } + policyPlacementStr := string(placement) + + appliedPlacement := util.GetLabelValue(resourceBinding.Annotations, util.PolicyPlacementAnnotation) + + if policyPlacementStr != appliedPlacement { + return ReconcileSchedule + } + + clusters := s.schedulerCache.Snapshot().GetClusters() + for _, tc := range resourceBinding.Spec.Clusters { + bindedCluster := tc.Name + for _, c := range clusters { + if c.Cluster().Name == bindedCluster { + if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { + return FailoverSchedule + } + } + } + } + } else { // ClusterResourceBinding + // TODO: + return Unknown + } + + return AvoidSchedule +} + func (s *Scheduler) scheduleNext() bool { key, shutdown := s.queue.Get() if shutdown { @@ -229,9 +312,27 @@ func (s *Scheduler) scheduleNext() bool { return false } defer s.queue.Done(key) - klog.Infof("Failover flag is: %v", Failover) - err := s.scheduleOne(key.(string)) + var err error + switch s.getScheduleType(key.(string)) { + case FirstSchedule: + err = s.scheduleOne(key.(string)) + klog.Infof("Start scheduling binding(%s)", key.(string)) + case ReconcileSchedule: // share same logic with first schedule + err = s.scheduleOne(key.(string)) + klog.Infof("Reschedule binding(%s) as placement changed", key.(string)) + case FailoverSchedule: + if Failover { + err = s.rescheduleOne(key.(string)) + klog.Infof("Reschedule binding(%s) as cluster failure", key.(string)) + } + case AvoidSchedule: + klog.Infof("Don't need to schedule binding(%s)", key.(string)) + default: + err = fmt.Errorf("unknow schedule type") + klog.Warningf("Failed to identify scheduler type for binding(%s)", key.(string)) + } + s.handleErr(err, key) return true } @@ -351,7 +452,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) { } utilruntime.HandleError(err) - klog.V(2).Infof("Dropping propagationbinding %q out of the queue: %v", key, err) + klog.V(2).Infof("Dropping ResourceBinding %q out of the queue: %v", key, err) s.queue.Forget(key) } @@ -374,6 +475,16 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) { } klog.V(3).Infof("update event for cluster %s", newCluster.Name) s.schedulerCache.UpdateCluster(newCluster) + + // Check if cluster becomes failure + if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { + klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover) + + if Failover { // Trigger reschedule on cluster failure only when flag is true. + s.expiredBindingInQueue(newCluster.Name) + return + } + } } func (s *Scheduler) deleteCluster(obj interface{}) { @@ -395,3 +506,103 @@ func (s *Scheduler) deleteCluster(obj interface{}) { klog.V(3).Infof("delete event for cluster %s", cluster.Name) s.schedulerCache.DeleteCluster(cluster) } + +// expiredBindingInQueue will find all ResourceBindings which are related to the current NotReady cluster and add them in queue. +func (s *Scheduler) expiredBindingInQueue(notReadyClusterName string) { + bindings, _ := s.bindingLister.List(labels.Everything()) + klog.Infof("Start traveling all ResourceBindings") + for _, binding := range bindings { + clusters := binding.Spec.Clusters + for _, bindingCluster := range clusters { + if bindingCluster.Name == notReadyClusterName { + rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding) + if err != nil { + klog.Errorf("couldn't get rescheduleKey for ResourceBinding %#v: %v", bindingCluster.Name, err) + return + } + s.queue.Add(rescheduleKey) + klog.Infof("Add expired ResourceBinding in queue successfully") + } + } + } +} + +func (s Scheduler) failoverCandidateCluster(binding *workv1alpha1.ResourceBinding) (reserved sets.String, candidates sets.String) { + bindedCluster := sets.NewString() + for _, cluster := range binding.Spec.Clusters { + bindedCluster.Insert(cluster.Name) + } + + availableCluster := sets.NewString() + for _, cluster := range s.schedulerCache.Snapshot().GetReadyClusters() { + availableCluster.Insert(cluster.Cluster().Name) + } + + return bindedCluster.Difference(bindedCluster.Difference(availableCluster)), availableCluster.Difference(bindedCluster) +} + +// rescheduleOne. +func (s *Scheduler) rescheduleOne(key string) (err error) { + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + klog.Infof("begin rescheduling ResourceBinding %s %s", ns, name) + defer klog.Infof("end rescheduling ResourceBinding %s: %s", ns, name) + + resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name) + if errors.IsNotFound(err) { + return nil + } + + binding := resourceBinding.DeepCopy() + reservedClusters, candidateClusters := s.failoverCandidateCluster(resourceBinding) + klog.Infof("Reserved clusters : %v", reservedClusters.List()) + klog.Infof("Candidate clusters: %v", candidateClusters.List()) + deltaLen := len(binding.Spec.Clusters) - len(reservedClusters) + + klog.Infof("binding(%s/%s) has %d failure clusters, and got %d candidates", ns, name, deltaLen, len(candidateClusters)) + + // TODO: should schedule as much as possible? + if len(candidateClusters) < deltaLen { + klog.Warningf("ignore reschedule binding(%s/%s) as insufficient available cluster", ns, name) + return nil + } + + targetClusters := reservedClusters + + for i := 0; i < deltaLen; i++ { + for clusterName := range candidateClusters { + curCluster, _ := s.clusterLister.Get(clusterName) + policyNamespace := util.GetLabelValue(binding.Labels, util.PropagationPolicyNamespaceLabel) + policyName := util.GetLabelValue(binding.Labels, util.PropagationPolicyNameLabel) + policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName) + + if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) { + continue + } + + klog.Infof("Rescheduling %s/ %s to member cluster %s", binding.Namespace, binding.Name, clusterName) + targetClusters.Insert(clusterName) + candidateClusters.Delete(clusterName) + + // break as soon as find a result + break + } + } + + // TODO(tinyma123) Check if the final result meets the spread constraints. + + binding.Spec.Clusters = nil + for cluster := range targetClusters { + binding.Spec.Clusters = append(binding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) + } + klog.Infof("The final binding.Spec.Cluster values are: %v\n", binding.Spec.Clusters) + + _, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +}