diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3d11527bc..01d047109 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -2,11 +2,14 @@ package scheduler import ( "context" + "encoding/json" "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -23,6 +26,7 @@ import ( schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache" "github.com/karmada-io/karmada/pkg/scheduler/core" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity" + "github.com/karmada-io/karmada/pkg/util" ) const ( @@ -82,6 +86,10 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse UpdateFunc: sched.onPropagationBindingUpdate, }) + policyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: sched.onPropagationPolicyUpdate, + }) + memclusterInformer := factory.Cluster().V1alpha1().Clusters().Informer() memclusterInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -127,6 +135,36 @@ func (s *Scheduler) onPropagationBindingUpdate(old, cur interface{}) { s.onPropagationBindingAdd(cur) } +func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) { + oldPropagationPolicy := old.(*v1alpha1.PropagationPolicy) + curPropagationPolicy := cur.(*v1alpha1.PropagationPolicy) + if equality.Semantic.DeepEqual(oldPropagationPolicy.Spec.Placement, curPropagationPolicy.Spec.Placement) { + klog.V(2).Infof("Ignore PropagationPolicy(%s/%s) which placement unchanged.", oldPropagationPolicy.Namespace, oldPropagationPolicy.Name) + return + } + + selector := labels.SelectorFromSet(labels.Set{ + util.PropagationPolicyNamespaceLabel: oldPropagationPolicy.Namespace, + util.PropagationPolicyNameLabel: oldPropagationPolicy.Name, + }) + + referenceBindings, err := s.bindingLister.List(selector) + if err != nil { + klog.Errorf("Failed to list PropagationBindings by selector: %s, error: %v", selector.String(), err) + return + } + + for _, binding := range referenceBindings { + key, err := cache.MetaNamespaceKeyFunc(binding) + if err != nil { + klog.Errorf("couldn't get key for object %#v: %v", binding, err) + return + } + klog.Infof("Requeue PropagationBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name) + s.queue.Add(key) + } +} + func (s *Scheduler) worker() { for s.scheduleNext() { } @@ -171,6 +209,25 @@ func (s *Scheduler) scheduleOne(key string) (err error) { targetClusters[i] = v1alpha1.TargetCluster{Name: cluster} } binding.Spec.Clusters = targetClusters + + policyNamespace := util.GetLabelValue(binding.Labels, util.PropagationPolicyNamespaceLabel) + policyName := util.GetLabelValue(binding.Labels, util.PropagationPolicyNameLabel) + + policy, err := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName) + if err != nil { + return err + } + + placement, err := json.Marshal(policy.Spec.Placement) + if err != nil { + klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policyNamespace, policyName, err) + } + + if binding.Annotations == nil { + binding.Annotations = make(map[string]string) + } + binding.Annotations[util.PolicyPlacementAnnotation] = string(placement) + _, err = s.KarmadaClient.PolicyV1alpha1().PropagationBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{}) if err != nil { return err