From 1d282bf179003a32baae5bf0aa849439ae09e5cf Mon Sep 17 00:00:00 2001 From: Poor12 Date: Wed, 21 Sep 2022 10:03:18 +0800 Subject: [PATCH] add metrics test for scheduler Signed-off-by: Poor12 --- pkg/scheduler/event_handler.go | 57 ++++--- pkg/scheduler/metrics/metrics.go | 7 +- pkg/scheduler/scheduler_metrics_test.go | 188 ++++++++++++++++++++++++ 3 files changed, 226 insertions(+), 26 deletions(-) create mode 100644 pkg/scheduler/scheduler_metrics_test.go diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index 32ffc2f11..0205c096e 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -12,6 +12,7 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" @@ -102,6 +103,28 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) { metrics.CountSchedulerBindings(metrics.BindingUpdate) } +func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBinding, event string) { + key, err := cache.MetaNamespaceKeyFunc(binding) + if err != nil { + klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err) + return + } + klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event) + s.queue.Add(key) + metrics.CountSchedulerBindings(event) +} + +func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, event string) { + key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding) + if err != nil { + klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err) + return + } + klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event) + s.queue.Add(key) + metrics.CountSchedulerBindings(event) +} + func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) { oldPropagationPolicy := old.(*policyv1alpha1.PropagationPolicy) curPropagationPolicy := cur.(*policyv1alpha1.PropagationPolicy) @@ -115,7 +138,7 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) { policyv1alpha1.PropagationPolicyNameLabel: oldPropagationPolicy.Name, }) - err := s.requeueResourceBindings(selector) + err := s.requeueResourceBindings(selector, metrics.PolicyChanged) if err != nil { klog.Errorf("Failed to requeue ResourceBinding, error: %v", err) return @@ -123,7 +146,7 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) { } // requeueClusterResourceBindings will retrieve all ClusterResourceBinding objects by the label selector and put them to queue. -func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) error { +func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector, event string) error { referenceClusterResourceBindings, err := s.clusterBindingLister.List(selector) if err != nil { klog.Errorf("Failed to list ClusterResourceBinding by selector: %s, error: %v", selector.String(), err) @@ -131,20 +154,13 @@ func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) err } for _, clusterResourceBinding := range referenceClusterResourceBindings { - key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding) - if err != nil { - klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err) - continue - } - klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name) - s.queue.Add(key) - metrics.CountSchedulerBindings(metrics.PolicyChanged) + s.onClusterResourceBindingRequeue(clusterResourceBinding, event) } return nil } // requeueResourceBindings will retrieve all ResourceBinding objects by the label selector and put them to queue. -func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error { +func (s *Scheduler) requeueResourceBindings(selector labels.Selector, event string) error { referenceBindings, err := s.bindingLister.List(selector) if err != nil { klog.Errorf("Failed to list ResourceBinding by selector: %s, error: %v", selector.String(), err) @@ -152,14 +168,7 @@ func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error { } for _, binding := range referenceBindings { - key, err := cache.MetaNamespaceKeyFunc(binding) - if err != nil { - klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err) - continue - } - klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name) - s.queue.Add(key) - metrics.CountSchedulerBindings(metrics.PolicyChanged) + s.onResourceBindingRequeue(binding, event) } return nil } @@ -176,12 +185,12 @@ func (s *Scheduler) onClusterPropagationPolicyUpdate(old, cur interface{}) { policyv1alpha1.ClusterPropagationPolicyLabel: oldClusterPropagationPolicy.Name, }) - err := s.requeueClusterResourceBindings(selector) + err := s.requeueClusterResourceBindings(selector, metrics.PolicyChanged) if err != nil { klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err) } - err = s.requeueResourceBindings(selector) + err = s.requeueResourceBindings(selector, metrics.PolicyChanged) if err != nil { klog.Errorf("Failed to requeue ResourceBinding, error: %v", err) } @@ -240,7 +249,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) { fallthrough case util.ClusterMatches(newCluster, *affinity): // If specific cluster matches the affinity. add it in queue - err := s.requeueResourceBindings(selector) + err := s.requeueResourceBindings(selector, metrics.ClusterChanged) if err != nil { klog.Errorf("Failed to requeue ResourceBinding, error: %v", err) } @@ -262,11 +271,11 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu fallthrough case util.ClusterMatches(newCluster, *affinity): // If specific cluster matches the affinity. add it in queue - err := s.requeueClusterResourceBindings(selector) + err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged) if err != nil { klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err) } - err = s.requeueResourceBindings(selector) + err = s.requeueResourceBindings(selector, metrics.ClusterChanged) if err != nil { klog.Errorf("Failed to requeue ResourceBinding, error: %v", err) } diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index bfdcef6bd..ab7687186 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -26,6 +26,8 @@ const ( ScheduleAttemptFailure = "ScheduleAttemptFailure" // PolicyChanged means binding needs to be rescheduled for the policy changed PolicyChanged = "PolicyChanged" + // ClusterChanged means binding needs to be rescheduled for the cluster changed + ClusterChanged = "ClusterChanged" ) const ( @@ -63,7 +65,8 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), }, []string{"schedule_step"}) - schedulerQueueIncomingBindings = promauto.NewCounterVec( + // SchedulerQueueIncomingBindings is the number of bindings added to scheduling queues by event type. + SchedulerQueueIncomingBindings = promauto.NewCounterVec( prometheus.CounterOpts{ Subsystem: SchedulerSubsystem, Name: "queue_incoming_bindings_total", @@ -93,5 +96,5 @@ func ScheduleStep(action string, startTime time.Time) { // CountSchedulerBindings records the number of binding added to scheduling queues by event type. func CountSchedulerBindings(event string) { - schedulerQueueIncomingBindings.WithLabelValues(event).Inc() + SchedulerQueueIncomingBindings.WithLabelValues(event).Inc() } diff --git a/pkg/scheduler/scheduler_metrics_test.go b/pkg/scheduler/scheduler_metrics_test.go new file mode 100644 index 000000000..3a774b9de --- /dev/null +++ b/pkg/scheduler/scheduler_metrics_test.go @@ -0,0 +1,188 @@ +package scheduler + +import ( + "fmt" + "strings" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + dynamicfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/component-base/metrics/testutil" + + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" + "github.com/karmada-io/karmada/pkg/scheduler/metrics" +) + +const incomingBindingMetricsName = "queue_incoming_bindings_total" + +type trigger func(scheduler *Scheduler, obj interface{}) + +var ( + addBinding = func(scheduler *Scheduler, obj interface{}) { + scheduler.onResourceBindingAdd(obj) + } + updateBinding = func(scheduler *Scheduler, obj interface{}) { + scheduler.onResourceBindingUpdate(nil, obj) + } + policyChanged = func(scheduler *Scheduler, obj interface{}) { + rb := obj.(*workv1alpha2.ResourceBinding) + scheduler.onResourceBindingRequeue(rb, metrics.PolicyChanged) + } + crbPolicyChanged = func(scheduler *Scheduler, obj interface{}) { + crb := obj.(*workv1alpha2.ClusterResourceBinding) + scheduler.onClusterResourceBindingRequeue(crb, metrics.PolicyChanged) + } + clusterChanged = func(scheduler *Scheduler, obj interface{}) { + rb := obj.(*workv1alpha2.ResourceBinding) + scheduler.onResourceBindingRequeue(rb, metrics.ClusterChanged) + } + crbClusterChanged = func(scheduler *Scheduler, obj interface{}) { + crb := obj.(*workv1alpha2.ClusterResourceBinding) + scheduler.onClusterResourceBindingRequeue(crb, metrics.ClusterChanged) + } + scheduleAttemptSuccess = func(scheduler *Scheduler, obj interface{}) { + scheduler.handleErr(nil, obj) + } + scheduleAttemptFailure = func(scheduler *Scheduler, obj interface{}) { + scheduler.handleErr(fmt.Errorf("schedule attempt failure"), obj) + } +) + +func TestIncomingBindingMetrics(t *testing.T) { + dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) + karmadaClient := karmadafake.NewSimpleClientset() + kubeClient := fake.NewSimpleClientset() + + sche, err := NewScheduler(dynamicClient, karmadaClient, kubeClient) + if err != nil { + t.Errorf("create scheduler error: %s", err) + } + + var rbInfos = make([]*workv1alpha2.ResourceBinding, 0, 3) + var crbInfos = make([]*workv1alpha2.ClusterResourceBinding, 0, 3) + for i := 1; i <= 3; i++ { + rb := &workv1alpha2.ResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-rb-%d", i), + Namespace: "bar", + Generation: 1, + }, + } + rbInfos = append(rbInfos, rb) + } + + for i := 1; i <= 3; i++ { + crb := &workv1alpha2.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-rb-%d", i), + }, + } + crbInfos = append(crbInfos, crb) + } + + rbTests := []struct { + name string + crbInvolved bool + trigger trigger + want string + }{ + { + name: "add resourceBinding", + crbInvolved: false, + trigger: addBinding, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="BindingAdd"} 3 +`, + }, + { + name: "update resourceBinding", + crbInvolved: false, + trigger: updateBinding, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="BindingUpdate"} 3 +`, + }, + { + name: "policy associated with resourceBinding changed", + crbInvolved: false, + trigger: policyChanged, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="PolicyChanged"} 3 +`, + }, + { + name: "policy associated with clusterResourceBinding changed", + crbInvolved: true, + trigger: crbPolicyChanged, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="PolicyChanged"} 3 +`, + }, + { + name: "cluster which matches the existing propagationPolicy changed", + crbInvolved: false, + trigger: clusterChanged, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="ClusterChanged"} 3 +`, + }, + { + name: "cluster which matches the existing clusterPropagationPolicy changed", + crbInvolved: true, + trigger: crbClusterChanged, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="ClusterChanged"} 3 +`, + }, + { + name: "schedule attempt success", + crbInvolved: false, + trigger: scheduleAttemptSuccess, + want: ``, + }, + { + name: "schedule attempt failure", + crbInvolved: false, + trigger: scheduleAttemptFailure, + want: ` + # HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type. + # TYPE karmada_scheduler_queue_incoming_bindings_total counter + karmada_scheduler_queue_incoming_bindings_total{event="ScheduleAttemptFailure"} 3 +`, + }, + } + + for _, test := range rbTests { + t.Run(test.name, func(t *testing.T) { + metrics.SchedulerQueueIncomingBindings.Reset() + if test.crbInvolved { + for _, crbInfo := range crbInfos { + test.trigger(sche, crbInfo) + } + } else { + for _, rbInfo := range rbInfos { + test.trigger(sche, rbInfo) + } + } + metricName := metrics.SchedulerSubsystem + "_" + incomingBindingMetricsName + if err := testutil.CollectAndCompare(metrics.SchedulerQueueIncomingBindings, strings.NewReader(test.want), metricName); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } + }) + } +}