Merge pull request #2555 from Poor12/add-metrics-test

Fix the issue that incomingBindingMetrics ignores the ClusterChanged event
This commit is contained in:
karmada-bot 2022-09-30 16:50:15 +08:00 committed by GitHub
commit 7ddf549e96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 226 additions and 26 deletions

View File

@ -12,6 +12,7 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
@ -102,6 +103,28 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
metrics.CountSchedulerBindings(metrics.BindingUpdate)
}
func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBinding, event string) {
key, err := cache.MetaNamespaceKeyFunc(binding)
if err != nil {
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
return
}
klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event)
s.queue.Add(key)
metrics.CountSchedulerBindings(event)
}
func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, event string) {
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
if err != nil {
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
return
}
klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event)
s.queue.Add(key)
metrics.CountSchedulerBindings(event)
}
func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
oldPropagationPolicy := old.(*policyv1alpha1.PropagationPolicy)
curPropagationPolicy := cur.(*policyv1alpha1.PropagationPolicy)
@ -115,7 +138,7 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
policyv1alpha1.PropagationPolicyNameLabel: oldPropagationPolicy.Name,
})
err := s.requeueResourceBindings(selector)
err := s.requeueResourceBindings(selector, metrics.PolicyChanged)
if err != nil {
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
return
@ -123,7 +146,7 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
}
// requeueClusterResourceBindings will retrieve all ClusterResourceBinding objects by the label selector and put them to queue.
func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) error {
func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector, event string) error {
referenceClusterResourceBindings, err := s.clusterBindingLister.List(selector)
if err != nil {
klog.Errorf("Failed to list ClusterResourceBinding by selector: %s, error: %v", selector.String(), err)
@ -131,20 +154,13 @@ func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) err
}
for _, clusterResourceBinding := range referenceClusterResourceBindings {
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
if err != nil {
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
continue
}
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.PolicyChanged)
s.onClusterResourceBindingRequeue(clusterResourceBinding, event)
}
return nil
}
// requeueResourceBindings will retrieve all ResourceBinding objects by the label selector and put them to queue.
func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
func (s *Scheduler) requeueResourceBindings(selector labels.Selector, event string) error {
referenceBindings, err := s.bindingLister.List(selector)
if err != nil {
klog.Errorf("Failed to list ResourceBinding by selector: %s, error: %v", selector.String(), err)
@ -152,14 +168,7 @@ func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
}
for _, binding := range referenceBindings {
key, err := cache.MetaNamespaceKeyFunc(binding)
if err != nil {
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
continue
}
klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.PolicyChanged)
s.onResourceBindingRequeue(binding, event)
}
return nil
}
@ -176,12 +185,12 @@ func (s *Scheduler) onClusterPropagationPolicyUpdate(old, cur interface{}) {
policyv1alpha1.ClusterPropagationPolicyLabel: oldClusterPropagationPolicy.Name,
})
err := s.requeueClusterResourceBindings(selector)
err := s.requeueClusterResourceBindings(selector, metrics.PolicyChanged)
if err != nil {
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
}
err = s.requeueResourceBindings(selector)
err = s.requeueResourceBindings(selector, metrics.PolicyChanged)
if err != nil {
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
}
@ -240,7 +249,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
fallthrough
case util.ClusterMatches(newCluster, *affinity):
// If specific cluster matches the affinity. add it in queue
err := s.requeueResourceBindings(selector)
err := s.requeueResourceBindings(selector, metrics.ClusterChanged)
if err != nil {
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
}
@ -262,11 +271,11 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu
fallthrough
case util.ClusterMatches(newCluster, *affinity):
// If specific cluster matches the affinity. add it in queue
err := s.requeueClusterResourceBindings(selector)
err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged)
if err != nil {
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
}
err = s.requeueResourceBindings(selector)
err = s.requeueResourceBindings(selector, metrics.ClusterChanged)
if err != nil {
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
}

View File

@ -26,6 +26,8 @@ const (
ScheduleAttemptFailure = "ScheduleAttemptFailure"
// PolicyChanged means binding needs to be rescheduled for the policy changed
PolicyChanged = "PolicyChanged"
// ClusterChanged means binding needs to be rescheduled for the cluster changed
ClusterChanged = "ClusterChanged"
)
const (
@ -63,7 +65,8 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"schedule_step"})
schedulerQueueIncomingBindings = promauto.NewCounterVec(
// SchedulerQueueIncomingBindings is the number of bindings added to scheduling queues by event type.
SchedulerQueueIncomingBindings = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "queue_incoming_bindings_total",
@ -93,5 +96,5 @@ func ScheduleStep(action string, startTime time.Time) {
// CountSchedulerBindings records the number of binding added to scheduling queues by event type.
func CountSchedulerBindings(event string) {
schedulerQueueIncomingBindings.WithLabelValues(event).Inc()
SchedulerQueueIncomingBindings.WithLabelValues(event).Inc()
}

View File

@ -0,0 +1,188 @@
package scheduler
import (
"fmt"
"strings"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
)
const incomingBindingMetricsName = "queue_incoming_bindings_total"
type trigger func(scheduler *Scheduler, obj interface{})
var (
addBinding = func(scheduler *Scheduler, obj interface{}) {
scheduler.onResourceBindingAdd(obj)
}
updateBinding = func(scheduler *Scheduler, obj interface{}) {
scheduler.onResourceBindingUpdate(nil, obj)
}
policyChanged = func(scheduler *Scheduler, obj interface{}) {
rb := obj.(*workv1alpha2.ResourceBinding)
scheduler.onResourceBindingRequeue(rb, metrics.PolicyChanged)
}
crbPolicyChanged = func(scheduler *Scheduler, obj interface{}) {
crb := obj.(*workv1alpha2.ClusterResourceBinding)
scheduler.onClusterResourceBindingRequeue(crb, metrics.PolicyChanged)
}
clusterChanged = func(scheduler *Scheduler, obj interface{}) {
rb := obj.(*workv1alpha2.ResourceBinding)
scheduler.onResourceBindingRequeue(rb, metrics.ClusterChanged)
}
crbClusterChanged = func(scheduler *Scheduler, obj interface{}) {
crb := obj.(*workv1alpha2.ClusterResourceBinding)
scheduler.onClusterResourceBindingRequeue(crb, metrics.ClusterChanged)
}
scheduleAttemptSuccess = func(scheduler *Scheduler, obj interface{}) {
scheduler.handleErr(nil, obj)
}
scheduleAttemptFailure = func(scheduler *Scheduler, obj interface{}) {
scheduler.handleErr(fmt.Errorf("schedule attempt failure"), obj)
}
)
func TestIncomingBindingMetrics(t *testing.T) {
dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme())
karmadaClient := karmadafake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
sche, err := NewScheduler(dynamicClient, karmadaClient, kubeClient)
if err != nil {
t.Errorf("create scheduler error: %s", err)
}
var rbInfos = make([]*workv1alpha2.ResourceBinding, 0, 3)
var crbInfos = make([]*workv1alpha2.ClusterResourceBinding, 0, 3)
for i := 1; i <= 3; i++ {
rb := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-rb-%d", i),
Namespace: "bar",
Generation: 1,
},
}
rbInfos = append(rbInfos, rb)
}
for i := 1; i <= 3; i++ {
crb := &workv1alpha2.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-rb-%d", i),
},
}
crbInfos = append(crbInfos, crb)
}
rbTests := []struct {
name string
crbInvolved bool
trigger trigger
want string
}{
{
name: "add resourceBinding",
crbInvolved: false,
trigger: addBinding,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="BindingAdd"} 3
`,
},
{
name: "update resourceBinding",
crbInvolved: false,
trigger: updateBinding,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="BindingUpdate"} 3
`,
},
{
name: "policy associated with resourceBinding changed",
crbInvolved: false,
trigger: policyChanged,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="PolicyChanged"} 3
`,
},
{
name: "policy associated with clusterResourceBinding changed",
crbInvolved: true,
trigger: crbPolicyChanged,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="PolicyChanged"} 3
`,
},
{
name: "cluster which matches the existing propagationPolicy changed",
crbInvolved: false,
trigger: clusterChanged,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="ClusterChanged"} 3
`,
},
{
name: "cluster which matches the existing clusterPropagationPolicy changed",
crbInvolved: true,
trigger: crbClusterChanged,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="ClusterChanged"} 3
`,
},
{
name: "schedule attempt success",
crbInvolved: false,
trigger: scheduleAttemptSuccess,
want: ``,
},
{
name: "schedule attempt failure",
crbInvolved: false,
trigger: scheduleAttemptFailure,
want: `
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
karmada_scheduler_queue_incoming_bindings_total{event="ScheduleAttemptFailure"} 3
`,
},
}
for _, test := range rbTests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingBindings.Reset()
if test.crbInvolved {
for _, crbInfo := range crbInfos {
test.trigger(sche, crbInfo)
}
} else {
for _, rbInfo := range rbInfos {
test.trigger(sche, rbInfo)
}
}
metricName := metrics.SchedulerSubsystem + "_" + incomingBindingMetricsName
if err := testutil.CollectAndCompare(metrics.SchedulerQueueIncomingBindings, strings.NewReader(test.want), metricName); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
})
}
}