reschedule when placement of clusterPropagationPolicies changed

Signed-off-by: lihanbo <lihanbo2@huawei.com>
This commit is contained in:
lihanbo 2021-03-12 17:05:10 +08:00 committed by Hongcai Ren
parent 0c07ddf469
commit 0de78c0c49
1 changed files with 33 additions and 1 deletions

View File

@ -109,6 +109,10 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
UpdateFunc: sched.onResourceBindingUpdate, UpdateFunc: sched.onResourceBindingUpdate,
}) })
clusterPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: sched.onClusterPropagationPolicyUpdate,
})
memclusterInformer := factory.Cluster().V1alpha1().Clusters().Informer() memclusterInformer := factory.Cluster().V1alpha1().Clusters().Informer()
memclusterInformer.AddEventHandler( memclusterInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
@ -180,6 +184,35 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
} }
} }
func (s *Scheduler) onClusterPropagationPolicyUpdate(old, cur interface{}) {
oldClusterPropagationPolicy := old.(*policyv1alpha1.ClusterPropagationPolicy)
curClusterPropagationPolicy := cur.(*policyv1alpha1.ClusterPropagationPolicy)
if equality.Semantic.DeepEqual(oldClusterPropagationPolicy.Spec.Placement, curClusterPropagationPolicy.Spec.Placement) {
klog.V(2).Infof("Ignore ClusterPropagationPolicy(%s) which placement unchanged.", oldClusterPropagationPolicy.Name)
return
}
selector := labels.SelectorFromSet(labels.Set{
util.ClusterPropagationPolicyLabel: oldClusterPropagationPolicy.Name,
})
referenceClusterResourceBindings, err := s.clusterBindingLister.List(selector)
if err != nil {
klog.Errorf("Failed to list ClusterResourceBinding by selector: %s, error: %v", selector.String(), err)
return
}
for _, clusterResourceBinding := range referenceClusterResourceBindings {
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", clusterResourceBinding, err)
return
}
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
s.queue.Add(key)
}
}
func (s *Scheduler) worker() { func (s *Scheduler) worker() {
for s.scheduleNext() { for s.scheduleNext() {
} }
@ -235,7 +268,6 @@ func (s *Scheduler) scheduleOne(key string) (err error) {
} }
return s.scheduleResourceBinding(resourceBinding, policy) return s.scheduleResourceBinding(resourceBinding, policy)
} }
func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding, policy *policyv1alpha1.PropagationPolicy) (err error) { func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding, policy *policyv1alpha1.PropagationPolicy) (err error) {