diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index d6ac299f7..1e20295c9 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -1,11 +1,14 @@ package scheduler import ( + "fmt" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -186,68 +189,10 @@ func (s *Scheduler) updateCluster(oldObj, newObj interface{}) { case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels): fallthrough case oldCluster.Generation != newCluster.Generation: - s.enqueueAffectedBindings(oldCluster, newCluster) - } -} - -// enqueueAffectedBinding find all RBs/CRBs related to the cluster and reschedule them -func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alpha1.Cluster) { - bindings, _ := s.bindingLister.List(labels.Everything()) - for _, binding := range bindings { - placementPtr := binding.Spec.Placement - if placementPtr == nil { - // never reach here - continue - } - - var affinity *policyv1alpha1.ClusterAffinity - if placementPtr.ClusterAffinities != nil { - affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName) - affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity - } else { - affinity = placementPtr.ClusterAffinity - } - - switch { - case affinity == nil: - // If no clusters specified, add it to the queue - fallthrough - case util.ClusterMatches(newCluster, *affinity): - // If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling - fallthrough - case util.ClusterMatches(oldCluster, *affinity): - // If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling - s.onResourceBindingRequeue(binding, metrics.ClusterChanged) - } - } - - clusterBindings, _ := s.clusterBindingLister.List(labels.Everything()) - for _, binding := range clusterBindings { - placementPtr := binding.Spec.Placement - if placementPtr == nil { - // never reach here - continue - } - - var affinity *policyv1alpha1.ClusterAffinity - if placementPtr.ClusterAffinities != nil { - affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName) - affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity - } else { - affinity = placementPtr.ClusterAffinity - } - - switch { - case affinity == nil: - // If no clusters specified, add it to the queue - fallthrough - case util.ClusterMatches(newCluster, *affinity): - // If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling - fallthrough - case util.ClusterMatches(oldCluster, *affinity): - // If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling - s.onClusterResourceBindingRequeue(binding, metrics.ClusterChanged) - } + // To distinguish the obd and new cluster objects, we need to add the entire object + // to the worker. Therefore, call Add func instead of Enqueue func. + s.clusterReconcileWorker.Add(oldCluster) + s.clusterReconcileWorker.Add(newCluster) } } @@ -282,3 +227,96 @@ func schedulerNameFilter(schedulerNameFromOptions, schedulerName string) bool { return schedulerNameFromOptions == schedulerName } + +func (s *Scheduler) reconcileCluster(key util.QueueKey) error { + cluster, ok := key.(*clusterv1alpha1.Cluster) + if !ok { + return fmt.Errorf("invalid cluster key: %s", key) + } + return utilerrors.NewAggregate([]error{ + s.enqueueAffectedBindings(cluster), + s.enqueueAffectedCRBs(cluster)}, + ) +} + +// enqueueAffectedBindings find all RBs related to the cluster and reschedule them +func (s *Scheduler) enqueueAffectedBindings(cluster *clusterv1alpha1.Cluster) error { + klog.V(4).Infof("Enqueue affected ResourceBinding with cluster %s", cluster.Name) + + bindings, _ := s.bindingLister.List(labels.Everything()) + for _, binding := range bindings { + placementPtr := binding.Spec.Placement + if placementPtr == nil { + // never reach here + continue + } + + var affinity *policyv1alpha1.ClusterAffinity + if placementPtr.ClusterAffinities != nil { + if binding.Status.SchedulerObservedGeneration != binding.Generation { + // Hit here means the binding maybe still in the queue waiting + // for scheduling or its status has not been synced to the + // cache. Just enqueue the binding to avoid missing the cluster + // update event. + s.onResourceBindingRequeue(binding, metrics.ClusterChanged) + continue + } + affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName) + affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity + } else { + affinity = placementPtr.ClusterAffinity + } + + switch { + case affinity == nil: + // If no clusters specified, add it to the queue + fallthrough + case util.ClusterMatches(cluster, *affinity): + // If the cluster manifest match the affinity, add it to the queue, trigger rescheduling + s.onResourceBindingRequeue(binding, metrics.ClusterChanged) + } + } + + return nil +} + +// enqueueAffectedCRBs find all CRBs related to the cluster and reschedule them +func (s *Scheduler) enqueueAffectedCRBs(cluster *clusterv1alpha1.Cluster) error { + klog.V(4).Infof("Enqueue affected ClusterResourceBinding with cluster %s", cluster.Name) + + clusterBindings, _ := s.clusterBindingLister.List(labels.Everything()) + for _, binding := range clusterBindings { + placementPtr := binding.Spec.Placement + if placementPtr == nil { + // never reach here + continue + } + + var affinity *policyv1alpha1.ClusterAffinity + if placementPtr.ClusterAffinities != nil { + if binding.Status.SchedulerObservedGeneration != binding.Generation { + // Hit here means the binding maybe still in the queue waiting + // for scheduling or its status has not been synced to the + // cache. Just enqueue the binding to avoid missing the cluster + // update event. + s.onClusterResourceBindingRequeue(binding, metrics.ClusterChanged) + continue + } + affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName) + affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity + } else { + affinity = placementPtr.ClusterAffinity + } + + switch { + case affinity == nil: + // If no clusters specified, add it to the queue + fallthrough + case util.ClusterMatches(cluster, *affinity): + // If the cluster manifest match the affinity, add it to the queue, trigger rescheduling + s.onClusterResourceBindingRequeue(binding, metrics.ClusterChanged) + } + } + + return nil +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e2561bcee..b97584bf9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -74,6 +74,9 @@ type Scheduler struct { clusterLister clusterlister.ClusterLister informerFactory informerfactory.SharedInformerFactory + // clusterReconcileWorker reconciles cluster changes to trigger corresponding + // ResourceBinding/ClusterResourceBinding rescheduling. + clusterReconcileWorker util.AsyncWorker // TODO: implement a priority scheduling queue queue workqueue.RateLimitingInterface @@ -218,6 +221,11 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse schedulerCache: schedulerCache, } + sched.clusterReconcileWorker = util.NewAsyncWorker(util.Options{ + Name: "ClusterReconcileWorker", + ReconcileFunc: sched.reconcileCluster, + }) + if options.enableSchedulerEstimator { sched.enableSchedulerEstimator = options.enableSchedulerEstimator sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode @@ -255,6 +263,8 @@ func (s *Scheduler) Run(ctx context.Context) { s.informerFactory.Start(stopCh) s.informerFactory.WaitForCacheSync(stopCh) + s.clusterReconcileWorker.Run(1, stopCh) + go wait.Until(s.worker, time.Second, stopCh) <-stopCh