reschedule when propagationPolicies changed (#190)

Signed-off-by: lihanbo <lihanbo2@huawei.com>
This commit is contained in:
Hanbo Li 2021-03-05 19:49:29 +08:00 committed by GitHub
parent 48e7782d4f
commit 7a8eca1bfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 57 additions and 0 deletions

View File

@ -2,11 +2,14 @@ package scheduler
import (
"context"
"encoding/json"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
@ -23,6 +26,7 @@ import (
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/util"
)
const (
@ -82,6 +86,10 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
UpdateFunc: sched.onPropagationBindingUpdate,
})
policyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: sched.onPropagationPolicyUpdate,
})
memclusterInformer := factory.Cluster().V1alpha1().Clusters().Informer()
memclusterInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
@ -127,6 +135,36 @@ func (s *Scheduler) onPropagationBindingUpdate(old, cur interface{}) {
s.onPropagationBindingAdd(cur)
}
func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
oldPropagationPolicy := old.(*v1alpha1.PropagationPolicy)
curPropagationPolicy := cur.(*v1alpha1.PropagationPolicy)
if equality.Semantic.DeepEqual(oldPropagationPolicy.Spec.Placement, curPropagationPolicy.Spec.Placement) {
klog.V(2).Infof("Ignore PropagationPolicy(%s/%s) which placement unchanged.", oldPropagationPolicy.Namespace, oldPropagationPolicy.Name)
return
}
selector := labels.SelectorFromSet(labels.Set{
util.PropagationPolicyNamespaceLabel: oldPropagationPolicy.Namespace,
util.PropagationPolicyNameLabel: oldPropagationPolicy.Name,
})
referenceBindings, err := s.bindingLister.List(selector)
if err != nil {
klog.Errorf("Failed to list PropagationBindings by selector: %s, error: %v", selector.String(), err)
return
}
for _, binding := range referenceBindings {
key, err := cache.MetaNamespaceKeyFunc(binding)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", binding, err)
return
}
klog.Infof("Requeue PropagationBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
s.queue.Add(key)
}
}
func (s *Scheduler) worker() {
for s.scheduleNext() {
}
@ -171,6 +209,25 @@ func (s *Scheduler) scheduleOne(key string) (err error) {
targetClusters[i] = v1alpha1.TargetCluster{Name: cluster}
}
binding.Spec.Clusters = targetClusters
policyNamespace := util.GetLabelValue(binding.Labels, util.PropagationPolicyNamespaceLabel)
policyName := util.GetLabelValue(binding.Labels, util.PropagationPolicyNameLabel)
policy, err := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
if err != nil {
return err
}
placement, err := json.Marshal(policy.Spec.Placement)
if err != nil {
klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policyNamespace, policyName, err)
}
if binding.Annotations == nil {
binding.Annotations = make(map[string]string)
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
_, err = s.KarmadaClient.PolicyV1alpha1().PropagationBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err