add metrics test for scheduler
Signed-off-by: Poor12 <shentiecheng@huawei.com>
This commit is contained in:
parent
6490bab0fe
commit
1d282bf179
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/fedinformer"
|
||||
|
@ -102,6 +103,28 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
|
|||
metrics.CountSchedulerBindings(metrics.BindingUpdate)
|
||||
}
|
||||
|
||||
func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBinding, event string) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(binding)
|
||||
if err != nil {
|
||||
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
|
||||
return
|
||||
}
|
||||
klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event)
|
||||
s.queue.Add(key)
|
||||
metrics.CountSchedulerBindings(event)
|
||||
}
|
||||
|
||||
func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, event string) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
|
||||
if err != nil {
|
||||
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
|
||||
return
|
||||
}
|
||||
klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event)
|
||||
s.queue.Add(key)
|
||||
metrics.CountSchedulerBindings(event)
|
||||
}
|
||||
|
||||
func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
|
||||
oldPropagationPolicy := old.(*policyv1alpha1.PropagationPolicy)
|
||||
curPropagationPolicy := cur.(*policyv1alpha1.PropagationPolicy)
|
||||
|
@ -115,7 +138,7 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
|
|||
policyv1alpha1.PropagationPolicyNameLabel: oldPropagationPolicy.Name,
|
||||
})
|
||||
|
||||
err := s.requeueResourceBindings(selector)
|
||||
err := s.requeueResourceBindings(selector, metrics.PolicyChanged)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||
return
|
||||
|
@ -123,7 +146,7 @@ func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
|
|||
}
|
||||
|
||||
// requeueClusterResourceBindings will retrieve all ClusterResourceBinding objects by the label selector and put them to queue.
|
||||
func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) error {
|
||||
func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector, event string) error {
|
||||
referenceClusterResourceBindings, err := s.clusterBindingLister.List(selector)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list ClusterResourceBinding by selector: %s, error: %v", selector.String(), err)
|
||||
|
@ -131,20 +154,13 @@ func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) err
|
|||
}
|
||||
|
||||
for _, clusterResourceBinding := range referenceClusterResourceBindings {
|
||||
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
|
||||
if err != nil {
|
||||
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
|
||||
continue
|
||||
}
|
||||
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
|
||||
s.queue.Add(key)
|
||||
metrics.CountSchedulerBindings(metrics.PolicyChanged)
|
||||
s.onClusterResourceBindingRequeue(clusterResourceBinding, event)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// requeueResourceBindings will retrieve all ResourceBinding objects by the label selector and put them to queue.
|
||||
func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
|
||||
func (s *Scheduler) requeueResourceBindings(selector labels.Selector, event string) error {
|
||||
referenceBindings, err := s.bindingLister.List(selector)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list ResourceBinding by selector: %s, error: %v", selector.String(), err)
|
||||
|
@ -152,14 +168,7 @@ func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
|
|||
}
|
||||
|
||||
for _, binding := range referenceBindings {
|
||||
key, err := cache.MetaNamespaceKeyFunc(binding)
|
||||
if err != nil {
|
||||
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
|
||||
continue
|
||||
}
|
||||
klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
|
||||
s.queue.Add(key)
|
||||
metrics.CountSchedulerBindings(metrics.PolicyChanged)
|
||||
s.onResourceBindingRequeue(binding, event)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -176,12 +185,12 @@ func (s *Scheduler) onClusterPropagationPolicyUpdate(old, cur interface{}) {
|
|||
policyv1alpha1.ClusterPropagationPolicyLabel: oldClusterPropagationPolicy.Name,
|
||||
})
|
||||
|
||||
err := s.requeueClusterResourceBindings(selector)
|
||||
err := s.requeueClusterResourceBindings(selector, metrics.PolicyChanged)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
||||
}
|
||||
|
||||
err = s.requeueResourceBindings(selector)
|
||||
err = s.requeueResourceBindings(selector, metrics.PolicyChanged)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||
}
|
||||
|
@ -240,7 +249,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
|
|||
fallthrough
|
||||
case util.ClusterMatches(newCluster, *affinity):
|
||||
// If specific cluster matches the affinity. add it in queue
|
||||
err := s.requeueResourceBindings(selector)
|
||||
err := s.requeueResourceBindings(selector, metrics.ClusterChanged)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||
}
|
||||
|
@ -262,11 +271,11 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu
|
|||
fallthrough
|
||||
case util.ClusterMatches(newCluster, *affinity):
|
||||
// If specific cluster matches the affinity. add it in queue
|
||||
err := s.requeueClusterResourceBindings(selector)
|
||||
err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
||||
}
|
||||
err = s.requeueResourceBindings(selector)
|
||||
err = s.requeueResourceBindings(selector, metrics.ClusterChanged)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ const (
|
|||
ScheduleAttemptFailure = "ScheduleAttemptFailure"
|
||||
// PolicyChanged means binding needs to be rescheduled for the policy changed
|
||||
PolicyChanged = "PolicyChanged"
|
||||
// ClusterChanged means binding needs to be rescheduled for the cluster changed
|
||||
ClusterChanged = "ClusterChanged"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -63,7 +65,8 @@ var (
|
|||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
|
||||
}, []string{"schedule_step"})
|
||||
|
||||
schedulerQueueIncomingBindings = promauto.NewCounterVec(
|
||||
// SchedulerQueueIncomingBindings is the number of bindings added to scheduling queues by event type.
|
||||
SchedulerQueueIncomingBindings = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "queue_incoming_bindings_total",
|
||||
|
@ -93,5 +96,5 @@ func ScheduleStep(action string, startTime time.Time) {
|
|||
|
||||
// CountSchedulerBindings records the number of binding added to scheduling queues by event type.
|
||||
func CountSchedulerBindings(event string) {
|
||||
schedulerQueueIncomingBindings.WithLabelValues(event).Inc()
|
||||
SchedulerQueueIncomingBindings.WithLabelValues(event).Inc()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
dynamicfake "k8s.io/client-go/dynamic/fake"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
|
||||
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||
)
|
||||
|
||||
const incomingBindingMetricsName = "queue_incoming_bindings_total"
|
||||
|
||||
type trigger func(scheduler *Scheduler, obj interface{})
|
||||
|
||||
var (
|
||||
addBinding = func(scheduler *Scheduler, obj interface{}) {
|
||||
scheduler.onResourceBindingAdd(obj)
|
||||
}
|
||||
updateBinding = func(scheduler *Scheduler, obj interface{}) {
|
||||
scheduler.onResourceBindingUpdate(nil, obj)
|
||||
}
|
||||
policyChanged = func(scheduler *Scheduler, obj interface{}) {
|
||||
rb := obj.(*workv1alpha2.ResourceBinding)
|
||||
scheduler.onResourceBindingRequeue(rb, metrics.PolicyChanged)
|
||||
}
|
||||
crbPolicyChanged = func(scheduler *Scheduler, obj interface{}) {
|
||||
crb := obj.(*workv1alpha2.ClusterResourceBinding)
|
||||
scheduler.onClusterResourceBindingRequeue(crb, metrics.PolicyChanged)
|
||||
}
|
||||
clusterChanged = func(scheduler *Scheduler, obj interface{}) {
|
||||
rb := obj.(*workv1alpha2.ResourceBinding)
|
||||
scheduler.onResourceBindingRequeue(rb, metrics.ClusterChanged)
|
||||
}
|
||||
crbClusterChanged = func(scheduler *Scheduler, obj interface{}) {
|
||||
crb := obj.(*workv1alpha2.ClusterResourceBinding)
|
||||
scheduler.onClusterResourceBindingRequeue(crb, metrics.ClusterChanged)
|
||||
}
|
||||
scheduleAttemptSuccess = func(scheduler *Scheduler, obj interface{}) {
|
||||
scheduler.handleErr(nil, obj)
|
||||
}
|
||||
scheduleAttemptFailure = func(scheduler *Scheduler, obj interface{}) {
|
||||
scheduler.handleErr(fmt.Errorf("schedule attempt failure"), obj)
|
||||
}
|
||||
)
|
||||
|
||||
func TestIncomingBindingMetrics(t *testing.T) {
|
||||
dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme())
|
||||
karmadaClient := karmadafake.NewSimpleClientset()
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
|
||||
sche, err := NewScheduler(dynamicClient, karmadaClient, kubeClient)
|
||||
if err != nil {
|
||||
t.Errorf("create scheduler error: %s", err)
|
||||
}
|
||||
|
||||
var rbInfos = make([]*workv1alpha2.ResourceBinding, 0, 3)
|
||||
var crbInfos = make([]*workv1alpha2.ClusterResourceBinding, 0, 3)
|
||||
for i := 1; i <= 3; i++ {
|
||||
rb := &workv1alpha2.ResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("test-rb-%d", i),
|
||||
Namespace: "bar",
|
||||
Generation: 1,
|
||||
},
|
||||
}
|
||||
rbInfos = append(rbInfos, rb)
|
||||
}
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
crb := &workv1alpha2.ClusterResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("test-rb-%d", i),
|
||||
},
|
||||
}
|
||||
crbInfos = append(crbInfos, crb)
|
||||
}
|
||||
|
||||
rbTests := []struct {
|
||||
name string
|
||||
crbInvolved bool
|
||||
trigger trigger
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "add resourceBinding",
|
||||
crbInvolved: false,
|
||||
trigger: addBinding,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="BindingAdd"} 3
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "update resourceBinding",
|
||||
crbInvolved: false,
|
||||
trigger: updateBinding,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="BindingUpdate"} 3
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "policy associated with resourceBinding changed",
|
||||
crbInvolved: false,
|
||||
trigger: policyChanged,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="PolicyChanged"} 3
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "policy associated with clusterResourceBinding changed",
|
||||
crbInvolved: true,
|
||||
trigger: crbPolicyChanged,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="PolicyChanged"} 3
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "cluster which matches the existing propagationPolicy changed",
|
||||
crbInvolved: false,
|
||||
trigger: clusterChanged,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="ClusterChanged"} 3
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "cluster which matches the existing clusterPropagationPolicy changed",
|
||||
crbInvolved: true,
|
||||
trigger: crbClusterChanged,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="ClusterChanged"} 3
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "schedule attempt success",
|
||||
crbInvolved: false,
|
||||
trigger: scheduleAttemptSuccess,
|
||||
want: ``,
|
||||
},
|
||||
{
|
||||
name: "schedule attempt failure",
|
||||
crbInvolved: false,
|
||||
trigger: scheduleAttemptFailure,
|
||||
want: `
|
||||
# HELP karmada_scheduler_queue_incoming_bindings_total Number of bindings added to scheduling queues by event type.
|
||||
# TYPE karmada_scheduler_queue_incoming_bindings_total counter
|
||||
karmada_scheduler_queue_incoming_bindings_total{event="ScheduleAttemptFailure"} 3
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range rbTests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
metrics.SchedulerQueueIncomingBindings.Reset()
|
||||
if test.crbInvolved {
|
||||
for _, crbInfo := range crbInfos {
|
||||
test.trigger(sche, crbInfo)
|
||||
}
|
||||
} else {
|
||||
for _, rbInfo := range rbInfos {
|
||||
test.trigger(sche, rbInfo)
|
||||
}
|
||||
}
|
||||
metricName := metrics.SchedulerSubsystem + "_" + incomingBindingMetricsName
|
||||
if err := testutil.CollectAndCompare(metrics.SchedulerQueueIncomingBindings, strings.NewReader(test.want), metricName); err != nil {
|
||||
t.Errorf("unexpected collecting result:\n%s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue