split the code about resource event into event_handler.go
Signed-off-by: huone1 <huwanxing@huawei.com>
This commit is contained in:
parent
4c9bfbf310
commit
e8b717575a
|
@ -0,0 +1,268 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||||
|
"github.com/karmada-io/karmada/pkg/features"
|
||||||
|
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||||
|
"github.com/karmada-io/karmada/pkg/util/gclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
// addAllEventHandlers is a helper function used in Scheduler
|
||||||
|
// to add event handlers for various informers.
|
||||||
|
func (s *Scheduler) addAllEventHandlers() {
|
||||||
|
bindingInformer := s.informerFactory.Work().V1alpha2().ResourceBindings().Informer()
|
||||||
|
bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: s.onResourceBindingAdd,
|
||||||
|
UpdateFunc: s.onResourceBindingUpdate,
|
||||||
|
})
|
||||||
|
|
||||||
|
policyInformer := s.informerFactory.Policy().V1alpha1().PropagationPolicies().Informer()
|
||||||
|
policyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
UpdateFunc: s.onPropagationPolicyUpdate,
|
||||||
|
})
|
||||||
|
|
||||||
|
clusterBindingInformer := s.informerFactory.Work().V1alpha2().ClusterResourceBindings().Informer()
|
||||||
|
clusterBindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: s.onResourceBindingAdd,
|
||||||
|
UpdateFunc: s.onResourceBindingUpdate,
|
||||||
|
})
|
||||||
|
|
||||||
|
clusterPolicyInformer := s.informerFactory.Policy().V1alpha1().ClusterPropagationPolicies().Informer()
|
||||||
|
clusterPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
UpdateFunc: s.onClusterPropagationPolicyUpdate,
|
||||||
|
})
|
||||||
|
|
||||||
|
memClusterInformer := s.informerFactory.Cluster().V1alpha1().Clusters().Informer()
|
||||||
|
memClusterInformer.AddEventHandler(
|
||||||
|
cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: s.addCluster,
|
||||||
|
UpdateFunc: s.updateCluster,
|
||||||
|
DeleteFunc: s.deleteCluster,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
eventBroadcaster.StartStructuredLogging(0)
|
||||||
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.KubeClient.CoreV1().Events("")})
|
||||||
|
s.eventRecorder = eventBroadcaster.NewRecorder(gclient.NewSchema(), corev1.EventSource{Component: "karmada-scheduler"})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
|
||||||
|
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("couldn't get key for object %#v: %v", obj, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.queue.Add(key)
|
||||||
|
metrics.CountSchedulerBindings(metrics.BindingAdd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
|
||||||
|
key, err := cache.MetaNamespaceKeyFunc(cur)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("couldn't get key for object %#v: %v", cur, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.queue.Add(key)
|
||||||
|
metrics.CountSchedulerBindings(metrics.BindingUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
|
||||||
|
oldPropagationPolicy := old.(*policyv1alpha1.PropagationPolicy)
|
||||||
|
curPropagationPolicy := cur.(*policyv1alpha1.PropagationPolicy)
|
||||||
|
if equality.Semantic.DeepEqual(oldPropagationPolicy.Spec.Placement, curPropagationPolicy.Spec.Placement) {
|
||||||
|
klog.V(2).Infof("Ignore PropagationPolicy(%s/%s) which placement unchanged.", oldPropagationPolicy.Namespace, oldPropagationPolicy.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
selector := labels.SelectorFromSet(labels.Set{
|
||||||
|
policyv1alpha1.PropagationPolicyNamespaceLabel: oldPropagationPolicy.Namespace,
|
||||||
|
policyv1alpha1.PropagationPolicyNameLabel: oldPropagationPolicy.Name,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := s.requeueResourceBindings(selector)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// requeueClusterResourceBindings will retrieve all ClusterResourceBinding objects by the label selector and put them to queue.
|
||||||
|
func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) error {
|
||||||
|
referenceClusterResourceBindings, err := s.clusterBindingLister.List(selector)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to list ClusterResourceBinding by selector: %s, error: %v", selector.String(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, clusterResourceBinding := range referenceClusterResourceBindings {
|
||||||
|
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
|
||||||
|
s.queue.Add(key)
|
||||||
|
metrics.CountSchedulerBindings(metrics.PolicyChanged)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// requeueResourceBindings will retrieve all ResourceBinding objects by the label selector and put them to queue.
|
||||||
|
func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
|
||||||
|
referenceBindings, err := s.bindingLister.List(selector)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to list ResourceBinding by selector: %s, error: %v", selector.String(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, binding := range referenceBindings {
|
||||||
|
key, err := cache.MetaNamespaceKeyFunc(binding)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
|
||||||
|
s.queue.Add(key)
|
||||||
|
metrics.CountSchedulerBindings(metrics.PolicyChanged)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) onClusterPropagationPolicyUpdate(old, cur interface{}) {
|
||||||
|
oldClusterPropagationPolicy := old.(*policyv1alpha1.ClusterPropagationPolicy)
|
||||||
|
curClusterPropagationPolicy := cur.(*policyv1alpha1.ClusterPropagationPolicy)
|
||||||
|
if equality.Semantic.DeepEqual(oldClusterPropagationPolicy.Spec.Placement, curClusterPropagationPolicy.Spec.Placement) {
|
||||||
|
klog.V(2).Infof("Ignore ClusterPropagationPolicy(%s) which placement unchanged.", oldClusterPropagationPolicy.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
selector := labels.SelectorFromSet(labels.Set{
|
||||||
|
policyv1alpha1.ClusterPropagationPolicyLabel: oldClusterPropagationPolicy.Name,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := s.requeueClusterResourceBindings(selector)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.requeueResourceBindings(selector)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) addCluster(obj interface{}) {
|
||||||
|
cluster, ok := obj.(*clusterv1alpha1.Cluster)
|
||||||
|
if !ok {
|
||||||
|
klog.Errorf("cannot convert to Cluster: %v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
klog.V(3).Infof("Add event for cluster %s", cluster.Name)
|
||||||
|
|
||||||
|
if s.enableSchedulerEstimator {
|
||||||
|
s.schedulerEstimatorWorker.Add(cluster.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// enqueueAffectedBinding will find all ResourceBindings which are related to the current NotReady cluster and add them in queue.
|
||||||
|
func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) {
|
||||||
|
bindings, _ := s.bindingLister.List(labels.Everything())
|
||||||
|
klog.Infof("Start traveling all ResourceBindings")
|
||||||
|
for _, binding := range bindings {
|
||||||
|
clusters := binding.Spec.Clusters
|
||||||
|
for _, bindingCluster := range clusters {
|
||||||
|
if bindingCluster.Name == notReadyClusterName {
|
||||||
|
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("couldn't get rescheduleKey for ResourceBinding %#v: %v", bindingCluster.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.queue.Add(rescheduleKey)
|
||||||
|
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
|
||||||
|
klog.Infof("Add expired ResourceBinding in queue successfully")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// enqueueAffectedClusterBinding will find all cluster resource bindings which are related to the current NotReady cluster and add them in queue.
|
||||||
|
func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) {
|
||||||
|
bindings, _ := s.clusterBindingLister.List(labels.Everything())
|
||||||
|
klog.Infof("Start traveling all ClusterResourceBindings")
|
||||||
|
for _, binding := range bindings {
|
||||||
|
clusters := binding.Spec.Clusters
|
||||||
|
for _, bindingCluster := range clusters {
|
||||||
|
if bindingCluster.Name == notReadyClusterName {
|
||||||
|
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("couldn't get rescheduleKey for ClusterResourceBinding %s: %v", bindingCluster.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.queue.Add(rescheduleKey)
|
||||||
|
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
|
||||||
|
klog.Infof("Add expired ClusterResourceBinding in queue successfully")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) updateCluster(_, newObj interface{}) {
|
||||||
|
newCluster, ok := newObj.(*clusterv1alpha1.Cluster)
|
||||||
|
if !ok {
|
||||||
|
klog.Errorf("cannot convert newObj to Cluster: %v", newObj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
klog.V(3).Infof("Update event for cluster %s", newCluster.Name)
|
||||||
|
|
||||||
|
if s.enableSchedulerEstimator {
|
||||||
|
s.schedulerEstimatorWorker.Add(newCluster.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if cluster becomes failure
|
||||||
|
if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
|
||||||
|
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, features.FeatureGate.Enabled(features.Failover))
|
||||||
|
|
||||||
|
if features.FeatureGate.Enabled(features.Failover) { // Trigger reschedule on cluster failure only when flag is true.
|
||||||
|
s.enqueueAffectedBinding(newCluster.Name)
|
||||||
|
s.enqueueAffectedClusterBinding(newCluster.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) deleteCluster(obj interface{}) {
|
||||||
|
var cluster *clusterv1alpha1.Cluster
|
||||||
|
switch t := obj.(type) {
|
||||||
|
case *clusterv1alpha1.Cluster:
|
||||||
|
cluster = t
|
||||||
|
case cache.DeletedFinalStateUnknown:
|
||||||
|
var ok bool
|
||||||
|
cluster, ok = t.Obj.(*clusterv1alpha1.Cluster)
|
||||||
|
if !ok {
|
||||||
|
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t.Obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
klog.V(3).Infof("Delete event for cluster %s", cluster.Name)
|
||||||
|
|
||||||
|
if s.enableSchedulerEstimator {
|
||||||
|
s.schedulerEstimatorWorker.Add(cluster.Name)
|
||||||
|
}
|
||||||
|
}
|
|
@ -7,23 +7,18 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/equality"
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"github.com/karmada-io/karmada/pkg/util/gclient"
|
|
||||||
|
|
||||||
"github.com/karmada-io/karmada/cmd/scheduler/app/options"
|
"github.com/karmada-io/karmada/cmd/scheduler/app/options"
|
||||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||||
|
@ -69,13 +64,9 @@ type Scheduler struct {
|
||||||
DynamicClient dynamic.Interface
|
DynamicClient dynamic.Interface
|
||||||
KarmadaClient karmadaclientset.Interface
|
KarmadaClient karmadaclientset.Interface
|
||||||
KubeClient kubernetes.Interface
|
KubeClient kubernetes.Interface
|
||||||
bindingInformer cache.SharedIndexInformer
|
|
||||||
bindingLister worklister.ResourceBindingLister
|
bindingLister worklister.ResourceBindingLister
|
||||||
policyInformer cache.SharedIndexInformer
|
|
||||||
policyLister policylister.PropagationPolicyLister
|
policyLister policylister.PropagationPolicyLister
|
||||||
clusterBindingInformer cache.SharedIndexInformer
|
|
||||||
clusterBindingLister worklister.ClusterResourceBindingLister
|
clusterBindingLister worklister.ClusterResourceBindingLister
|
||||||
clusterPolicyInformer cache.SharedIndexInformer
|
|
||||||
clusterPolicyLister policylister.ClusterPropagationPolicyLister
|
clusterPolicyLister policylister.ClusterPropagationPolicyLister
|
||||||
clusterLister clusterlister.ClusterLister
|
clusterLister clusterlister.ClusterLister
|
||||||
informerFactory informerfactory.SharedInformerFactory
|
informerFactory informerfactory.SharedInformerFactory
|
||||||
|
@ -97,13 +88,9 @@ type Scheduler struct {
|
||||||
// NewScheduler instantiates a scheduler
|
// NewScheduler instantiates a scheduler
|
||||||
func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts *options.Options) *Scheduler {
|
func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts *options.Options) *Scheduler {
|
||||||
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
|
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
|
||||||
bindingInformer := factory.Work().V1alpha2().ResourceBindings().Informer()
|
|
||||||
bindingLister := factory.Work().V1alpha2().ResourceBindings().Lister()
|
bindingLister := factory.Work().V1alpha2().ResourceBindings().Lister()
|
||||||
policyInformer := factory.Policy().V1alpha1().PropagationPolicies().Informer()
|
|
||||||
policyLister := factory.Policy().V1alpha1().PropagationPolicies().Lister()
|
policyLister := factory.Policy().V1alpha1().PropagationPolicies().Lister()
|
||||||
clusterBindingInformer := factory.Work().V1alpha2().ClusterResourceBindings().Informer()
|
|
||||||
clusterBindingLister := factory.Work().V1alpha2().ClusterResourceBindings().Lister()
|
clusterBindingLister := factory.Work().V1alpha2().ClusterResourceBindings().Lister()
|
||||||
clusterPolicyInformer := factory.Policy().V1alpha1().ClusterPropagationPolicies().Informer()
|
|
||||||
clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister()
|
clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister()
|
||||||
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
|
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
|
||||||
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
||||||
|
@ -114,13 +101,9 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
|
||||||
DynamicClient: dynamicClient,
|
DynamicClient: dynamicClient,
|
||||||
KarmadaClient: karmadaClient,
|
KarmadaClient: karmadaClient,
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
bindingInformer: bindingInformer,
|
|
||||||
bindingLister: bindingLister,
|
bindingLister: bindingLister,
|
||||||
policyInformer: policyInformer,
|
|
||||||
policyLister: policyLister,
|
policyLister: policyLister,
|
||||||
clusterBindingInformer: clusterBindingInformer,
|
|
||||||
clusterBindingLister: clusterBindingLister,
|
clusterBindingLister: clusterBindingLister,
|
||||||
clusterPolicyInformer: clusterPolicyInformer,
|
|
||||||
clusterPolicyLister: clusterPolicyLister,
|
clusterPolicyLister: clusterPolicyLister,
|
||||||
clusterLister: clusterLister,
|
clusterLister: clusterLister,
|
||||||
informerFactory: factory,
|
informerFactory: factory,
|
||||||
|
@ -139,38 +122,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
|
||||||
|
|
||||||
metrics.Register()
|
metrics.Register()
|
||||||
|
|
||||||
bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
sched.addAllEventHandlers()
|
||||||
AddFunc: sched.onResourceBindingAdd,
|
|
||||||
UpdateFunc: sched.onResourceBindingUpdate,
|
|
||||||
})
|
|
||||||
|
|
||||||
policyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
UpdateFunc: sched.onPropagationPolicyUpdate,
|
|
||||||
})
|
|
||||||
|
|
||||||
clusterBindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: sched.onResourceBindingAdd,
|
|
||||||
UpdateFunc: sched.onResourceBindingUpdate,
|
|
||||||
})
|
|
||||||
|
|
||||||
clusterPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
UpdateFunc: sched.onClusterPropagationPolicyUpdate,
|
|
||||||
})
|
|
||||||
|
|
||||||
memclusterInformer := factory.Cluster().V1alpha1().Clusters().Informer()
|
|
||||||
memclusterInformer.AddEventHandler(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: sched.addCluster,
|
|
||||||
UpdateFunc: sched.updateCluster,
|
|
||||||
DeleteFunc: sched.deleteCluster,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
eventBroadcaster.StartStructuredLogging(0)
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
|
||||||
sched.eventRecorder = eventBroadcaster.NewRecorder(gclient.NewSchema(), corev1.EventSource{Component: "karmada-scheduler"})
|
|
||||||
|
|
||||||
return sched
|
return sched
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,127 +139,18 @@ func (s *Scheduler) Run(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
s.informerFactory.Start(stopCh)
|
s.informerFactory.Start(stopCh)
|
||||||
if !cache.WaitForCacheSync(stopCh, s.bindingInformer.HasSynced) {
|
s.informerFactory.WaitForCacheSync(stopCh)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go wait.Until(s.worker, time.Second, stopCh)
|
go wait.Until(s.worker, time.Second, stopCh)
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
|
|
||||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("couldn't get key for object %#v: %v", obj, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.queue.Add(key)
|
|
||||||
metrics.CountSchedulerBindings(metrics.BindingAdd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
|
|
||||||
key, err := cache.MetaNamespaceKeyFunc(cur)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("couldn't get key for object %#v: %v", cur, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.queue.Add(key)
|
|
||||||
metrics.CountSchedulerBindings(metrics.BindingUpdate)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
|
|
||||||
oldPropagationPolicy := old.(*policyv1alpha1.PropagationPolicy)
|
|
||||||
curPropagationPolicy := cur.(*policyv1alpha1.PropagationPolicy)
|
|
||||||
if equality.Semantic.DeepEqual(oldPropagationPolicy.Spec.Placement, curPropagationPolicy.Spec.Placement) {
|
|
||||||
klog.V(2).Infof("Ignore PropagationPolicy(%s/%s) which placement unchanged.", oldPropagationPolicy.Namespace, oldPropagationPolicy.Name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
selector := labels.SelectorFromSet(labels.Set{
|
|
||||||
policyv1alpha1.PropagationPolicyNamespaceLabel: oldPropagationPolicy.Namespace,
|
|
||||||
policyv1alpha1.PropagationPolicyNameLabel: oldPropagationPolicy.Name,
|
|
||||||
})
|
|
||||||
|
|
||||||
err := s.requeueResourceBindings(selector)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) onClusterPropagationPolicyUpdate(old, cur interface{}) {
|
|
||||||
oldClusterPropagationPolicy := old.(*policyv1alpha1.ClusterPropagationPolicy)
|
|
||||||
curClusterPropagationPolicy := cur.(*policyv1alpha1.ClusterPropagationPolicy)
|
|
||||||
if equality.Semantic.DeepEqual(oldClusterPropagationPolicy.Spec.Placement, curClusterPropagationPolicy.Spec.Placement) {
|
|
||||||
klog.V(2).Infof("Ignore ClusterPropagationPolicy(%s) which placement unchanged.", oldClusterPropagationPolicy.Name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
selector := labels.SelectorFromSet(labels.Set{
|
|
||||||
policyv1alpha1.ClusterPropagationPolicyLabel: oldClusterPropagationPolicy.Name,
|
|
||||||
})
|
|
||||||
|
|
||||||
err := s.requeueClusterResourceBindings(selector)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.requeueResourceBindings(selector)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) worker() {
|
func (s *Scheduler) worker() {
|
||||||
for s.scheduleNext() {
|
for s.scheduleNext() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// requeueResourceBindings will retrieve all ResourceBinding objects by the label selector and put them to queue.
|
|
||||||
func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
|
|
||||||
referenceBindings, err := s.bindingLister.List(selector)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to list ResourceBinding by selector: %s, error: %v", selector.String(), err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, binding := range referenceBindings {
|
|
||||||
key, err := cache.MetaNamespaceKeyFunc(binding)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
|
|
||||||
s.queue.Add(key)
|
|
||||||
metrics.CountSchedulerBindings(metrics.PolicyChanged)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// requeueClusterResourceBindings will retrieve all ClusterResourceBinding objects by the label selector and put them to queue.
|
|
||||||
func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) error {
|
|
||||||
referenceClusterResourceBindings, err := s.clusterBindingLister.List(selector)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to list ClusterResourceBinding by selector: %s, error: %v", selector.String(), err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, clusterResourceBinding := range referenceClusterResourceBindings {
|
|
||||||
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
|
|
||||||
s.queue.Add(key)
|
|
||||||
metrics.CountSchedulerBindings(metrics.PolicyChanged)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) getPlacement(resourceBinding *workv1alpha2.ResourceBinding) (policyv1alpha1.Placement, string, error) {
|
func (s *Scheduler) getPlacement(resourceBinding *workv1alpha2.ResourceBinding) (policyv1alpha1.Placement, string, error) {
|
||||||
var placement policyv1alpha1.Placement
|
var placement policyv1alpha1.Placement
|
||||||
var clusterPolicyName string
|
var clusterPolicyName string
|
||||||
|
@ -599,108 +442,6 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
|
||||||
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
|
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) addCluster(obj interface{}) {
|
|
||||||
cluster, ok := obj.(*clusterv1alpha1.Cluster)
|
|
||||||
if !ok {
|
|
||||||
klog.Errorf("cannot convert to Cluster: %v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
klog.V(3).Infof("Add event for cluster %s", cluster.Name)
|
|
||||||
|
|
||||||
if s.enableSchedulerEstimator {
|
|
||||||
s.schedulerEstimatorWorker.Add(cluster.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) updateCluster(_, newObj interface{}) {
|
|
||||||
newCluster, ok := newObj.(*clusterv1alpha1.Cluster)
|
|
||||||
if !ok {
|
|
||||||
klog.Errorf("cannot convert newObj to Cluster: %v", newObj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
klog.V(3).Infof("Update event for cluster %s", newCluster.Name)
|
|
||||||
|
|
||||||
if s.enableSchedulerEstimator {
|
|
||||||
s.schedulerEstimatorWorker.Add(newCluster.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if cluster becomes failure
|
|
||||||
if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
|
|
||||||
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, features.FeatureGate.Enabled(features.Failover))
|
|
||||||
|
|
||||||
if features.FeatureGate.Enabled(features.Failover) { // Trigger reschedule on cluster failure only when flag is true.
|
|
||||||
s.enqueueAffectedBinding(newCluster.Name)
|
|
||||||
s.enqueueAffectedClusterBinding(newCluster.Name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) deleteCluster(obj interface{}) {
|
|
||||||
var cluster *clusterv1alpha1.Cluster
|
|
||||||
switch t := obj.(type) {
|
|
||||||
case *clusterv1alpha1.Cluster:
|
|
||||||
cluster = t
|
|
||||||
case cache.DeletedFinalStateUnknown:
|
|
||||||
var ok bool
|
|
||||||
cluster, ok = t.Obj.(*clusterv1alpha1.Cluster)
|
|
||||||
if !ok {
|
|
||||||
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t.Obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
klog.V(3).Infof("Delete event for cluster %s", cluster.Name)
|
|
||||||
|
|
||||||
if s.enableSchedulerEstimator {
|
|
||||||
s.schedulerEstimatorWorker.Add(cluster.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// enqueueAffectedBinding will find all ResourceBindings which are related to the current NotReady cluster and add them in queue.
|
|
||||||
func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) {
|
|
||||||
bindings, _ := s.bindingLister.List(labels.Everything())
|
|
||||||
klog.Infof("Start traveling all ResourceBindings")
|
|
||||||
for _, binding := range bindings {
|
|
||||||
clusters := binding.Spec.Clusters
|
|
||||||
for _, bindingCluster := range clusters {
|
|
||||||
if bindingCluster.Name == notReadyClusterName {
|
|
||||||
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("couldn't get rescheduleKey for ResourceBinding %#v: %v", bindingCluster.Name, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.queue.Add(rescheduleKey)
|
|
||||||
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
|
|
||||||
klog.Infof("Add expired ResourceBinding in queue successfully")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// enqueueAffectedClusterBinding will find all cluster resource bindings which are related to the current NotReady cluster and add them in queue.
|
|
||||||
func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) {
|
|
||||||
bindings, _ := s.clusterBindingLister.List(labels.Everything())
|
|
||||||
klog.Infof("Start traveling all ClusterResourceBindings")
|
|
||||||
for _, binding := range bindings {
|
|
||||||
clusters := binding.Spec.Clusters
|
|
||||||
for _, bindingCluster := range clusters {
|
|
||||||
if bindingCluster.Name == notReadyClusterName {
|
|
||||||
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("couldn't get rescheduleKey for ClusterResourceBinding %s: %v", bindingCluster.Name, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.queue.Add(rescheduleKey)
|
|
||||||
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
|
|
||||||
klog.Infof("Add expired ClusterResourceBinding in queue successfully")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) error {
|
func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) error {
|
||||||
klog.V(4).InfoS("Begin rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
|
klog.V(4).InfoS("Begin rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
|
||||||
defer klog.V(4).InfoS("End rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
|
defer klog.V(4).InfoS("End rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
|
||||||
|
|
Loading…
Reference in New Issue