From 226b48bd52cfc42986ea54e23fed5f8254a603d5 Mon Sep 17 00:00:00 2001 From: junqian Date: Wed, 22 Sep 2021 10:28:02 +0800 Subject: [PATCH] add metrics for scheduler Signed-off-by: junqian --- cmd/scheduler/app/scheduler.go | 7 ++- go.mod | 1 + pkg/scheduler/core/generic_scheduler.go | 9 +++ pkg/scheduler/metrics/metrics.go | 73 +++++++++++++++++++++++++ pkg/scheduler/metrics/metrics_record.go | 57 +++++++++++++++++++ pkg/scheduler/scheduler.go | 24 +++++++- vendor/modules.txt | 1 + 7 files changed, 169 insertions(+), 3 deletions(-) create mode 100644 pkg/scheduler/metrics/metrics.go create mode 100644 pkg/scheduler/metrics/metrics_record.go diff --git a/cmd/scheduler/app/scheduler.go b/cmd/scheduler/app/scheduler.go index 76b3d62c2..f763f715d 100644 --- a/cmd/scheduler/app/scheduler.go +++ b/cmd/scheduler/app/scheduler.go @@ -8,6 +8,7 @@ import ( "os" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -47,7 +48,7 @@ func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command { func run(opts *options.Options, stopChan <-chan struct{}) error { klog.Infof("karmada-scheduler version: %s", version.Get()) - go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort)) + go serveHealthzAndMetrics(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort)) restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig) if err != nil { @@ -111,11 +112,13 @@ func run(opts *options.Options, stopChan <-chan struct{}) error { return nil } -func serveHealthz(address string) { +func serveHealthzAndMetrics(address string) { http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) + http.Handle("/metrics", promhttp.Handler()) + klog.Fatal(http.ListenAndServe(address, nil)) } diff --git a/go.mod b/go.mod index 32a239347..fd6fbf512 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/kr/pretty v0.3.0 github.com/onsi/ginkgo v1.16.4 github.com/onsi/gomega v1.14.0 + github.com/prometheus/client_golang v1.11.0 github.com/spf13/cobra v1.1.3 github.com/spf13/pflag v1.0.5 golang.org/x/tools v0.1.2 diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 05c10a975..3d9b43e03 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "sort" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -17,6 +18,7 @@ import ( "github.com/karmada-io/karmada/pkg/scheduler/cache" "github.com/karmada-io/karmada/pkg/scheduler/framework" "github.com/karmada-io/karmada/pkg/scheduler/framework/runtime" + "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" ) @@ -92,6 +94,8 @@ func (g *genericScheduler) findClustersThatFit( placement *policyv1alpha1.Placement, resource *workv1alpha2.ObjectReference, clusterInfo *cache.Snapshot) ([]*clusterv1alpha1.Cluster, error) { + defer metrics.ScheduleStep(metrics.ScheduleStepFilter, time.Now()) + var out []*clusterv1alpha1.Cluster clusters := clusterInfo.GetReadyClusters() for _, c := range clusters { @@ -113,6 +117,8 @@ func (g *genericScheduler) prioritizeClusters( fwk framework.Framework, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (result framework.ClusterScoreList, err error) { + defer metrics.ScheduleStep(metrics.ScheduleStepScore, time.Now()) + scoresMap, err := fwk.RunScorePlugins(ctx, placement, clusters) if err != nil { return result, err @@ -130,6 +136,8 @@ func (g *genericScheduler) prioritizeClusters( } func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, spreadConstraints []policyv1alpha1.SpreadConstraint, clusters []*clusterv1alpha1.Cluster) []*clusterv1alpha1.Cluster { + defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now()) + if len(spreadConstraints) != 0 { return g.matchSpreadConstraints(clusters, spreadConstraints) } @@ -200,6 +208,7 @@ func (g *genericScheduler) chooseSpreadGroup(spreadGroup *util.SpreadGroup) []*c } func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, replicaSchedulingStrategy *policyv1alpha1.ReplicaSchedulingStrategy, object *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { + defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, time.Now()) if len(clusters) == 0 { return nil, fmt.Errorf("no clusters available to schedule") } diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go new file mode 100644 index 000000000..a0a2cfd91 --- /dev/null +++ b/pkg/scheduler/metrics/metrics.go @@ -0,0 +1,73 @@ +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// SchedulerSubsystem - subsystem name used by scheduler +const SchedulerSubsystem = "karmada_scheduler" + +var ( + scheduleAttempts = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: SchedulerSubsystem, + Name: "schedule_attempts_total", + Help: "Number of attempts to schedule resourceBinding", + }, []string{"result", "scheduleType"}) + + e2eSchedulingLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "e2e_scheduling_duration_seconds", + Help: "E2e scheduling latency in seconds", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), + }, []string{"result", "scheduleType"}) + + schedulingAlgorithmLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "scheduling_algorithm_duration_seconds", + Help: "Scheduling algorithm latency in seconds(exclude scale scheduler)", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), + }, []string{"scheduleStep"}) + + schedulerQueueIncomingBindings = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: SchedulerSubsystem, + Name: "queue_incoming_bindings_total", + Help: "Number of bindings added to scheduling queues by event type.", + }, []string{"event"}) + + metricsList = []prometheus.Collector{ + scheduleAttempts, + e2eSchedulingLatency, + schedulingAlgorithmLatency, + schedulerQueueIncomingBindings, + } +) + +var registerMetrics sync.Once + +// Register all metrics. +func Register() { + // Register the metrics. + registerMetrics.Do(func() { + RegisterMetrics(metricsList...) + }) +} + +// RegisterMetrics registers a list of metrics. +// This function is exported because it is intended to be used by out-of-tree plugins to register their custom metrics. +func RegisterMetrics(extraMetrics ...prometheus.Collector) { + for _, metric := range extraMetrics { + prometheus.MustRegister(metric) + } +} + +// SinceInSeconds gets the time since the specified start in seconds. +func SinceInSeconds(start time.Time) float64 { + return time.Since(start).Seconds() +} diff --git a/pkg/scheduler/metrics/metrics_record.go b/pkg/scheduler/metrics/metrics_record.go new file mode 100644 index 000000000..3f18fa0cf --- /dev/null +++ b/pkg/scheduler/metrics/metrics_record.go @@ -0,0 +1,57 @@ +package metrics + +import "time" + +const ( + scheduledResult = "scheduled" + errorResult = "error" +) + +const ( + // BindingAdd is the event when a new binding is added to API server. + BindingAdd = "BindingAdd" + // BindingUpdate is the event when a new binding is updated to API server. + BindingUpdate = "BindingUpdate" + // ScheduleAttemptFailure is the event when a schedule attempt fails. + ScheduleAttemptFailure = "ScheduleAttemptFailure" + // PolicyChanged means binding needs to be rescheduled for the policy changed + PolicyChanged = "PolicyChanged" + // ClusterNotReady means binding needs to be rescheduled for cluster is not ready + ClusterNotReady = "ClusterNotReady" +) + +const ( + // ScheduleStepFilter means the step in generic scheduler to filter clusters + ScheduleStepFilter = "Filter" + // ScheduleStepScore means the step in generic scheduler to score clusters + ScheduleStepScore = "Score" + // ScheduleStepSelect means the step in generic scheduler to select clusters + ScheduleStepSelect = "Select" + // ScheduleStepAssignReplicas means the step in generic scheduler to assign replicas + ScheduleStepAssignReplicas = "AssignReplicas" +) + +// BindingSchedule can record a scheduling attempt and the duration +// since `start`. +func BindingSchedule(scheduleType string, duration float64, err error) { + if err != nil { + observeScheduleAttemptAndLatency(errorResult, scheduleType, duration) + } else { + observeScheduleAttemptAndLatency(scheduledResult, scheduleType, duration) + } +} + +func observeScheduleAttemptAndLatency(result, scheduleType string, duration float64) { + e2eSchedulingLatency.WithLabelValues(result, scheduleType).Observe(duration) + scheduleAttempts.WithLabelValues(result, scheduleType).Inc() +} + +// ScheduleStep can record each scheduling step duration. +func ScheduleStep(action string, startTime time.Time) { + schedulingAlgorithmLatency.WithLabelValues(action).Observe(SinceInSeconds(startTime)) +} + +// CountSchedulerBindings records the number of binding added to scheduling queues by event type. +func CountSchedulerBindings(event string) { + schedulerQueueIncomingBindings.WithLabelValues(event).Inc() +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 74be67e8f..d6d1d3d6d 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -34,6 +34,7 @@ import ( "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration" + "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" ) @@ -135,6 +136,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse estimatorclient.RegisterSchedulerEstimator(schedulerEstimator) } + metrics.Register() + bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sched.onResourceBindingAdd, UpdateFunc: sched.onResourceBindingUpdate, @@ -195,10 +198,18 @@ func (s *Scheduler) onResourceBindingAdd(obj interface{}) { } s.queue.Add(key) + metrics.CountSchedulerBindings(metrics.BindingAdd) } func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) { - s.onResourceBindingAdd(cur) + key, err := cache.MetaNamespaceKeyFunc(cur) + if err != nil { + klog.Errorf("couldn't get key for object %#v: %v", cur, err) + return + } + + s.queue.Add(key) + metrics.CountSchedulerBindings(metrics.BindingUpdate) } func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) { @@ -265,6 +276,7 @@ func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error { } klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name) s.queue.Add(key) + metrics.CountSchedulerBindings(metrics.PolicyChanged) } return nil } @@ -285,6 +297,7 @@ func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) err } klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name) s.queue.Add(key) + metrics.CountSchedulerBindings(metrics.PolicyChanged) } return nil } @@ -373,21 +386,27 @@ func (s *Scheduler) scheduleNext() bool { } defer s.queue.Done(key) + start := time.Now() + var err error switch s.getScheduleType(key.(string)) { case FirstSchedule: err = s.scheduleOne(key.(string)) klog.Infof("Start scheduling binding(%s)", key.(string)) + metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err) case ReconcileSchedule: // share same logic with first schedule err = s.scheduleOne(key.(string)) klog.Infof("Reschedule binding(%s) as placement changed", key.(string)) + metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err) case ScaleSchedule: err = s.scaleScheduleOne(key.(string)) klog.Infof("Reschedule binding(%s) as replicas scaled down or scaled up", key.(string)) + metrics.BindingSchedule(string(ScaleSchedule), metrics.SinceInSeconds(start), err) case FailoverSchedule: if Failover { err = s.rescheduleOne(key.(string)) klog.Infof("Reschedule binding(%s) as cluster failure", key.(string)) + metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err) } case AvoidSchedule: klog.Infof("Don't need to schedule binding(%s)", key.(string)) @@ -496,6 +515,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) { } s.queue.AddRateLimited(key) + metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure) } func (s *Scheduler) addCluster(obj interface{}) { @@ -572,6 +592,7 @@ func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) { return } s.queue.Add(rescheduleKey) + metrics.CountSchedulerBindings(metrics.ClusterNotReady) klog.Infof("Add expired ResourceBinding in queue successfully") } } @@ -592,6 +613,7 @@ func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) { return } s.queue.Add(rescheduleKey) + metrics.CountSchedulerBindings(metrics.ClusterNotReady) klog.Infof("Add expired ClusterResourceBinding in queue successfully") } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 5c80feb8d..5375043ba 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -190,6 +190,7 @@ github.com/pelletier/go-toml # github.com/pkg/errors v0.9.1 github.com/pkg/errors # github.com/prometheus/client_golang v1.11.0 +## explicit github.com/prometheus/client_golang/prometheus github.com/prometheus/client_golang/prometheus/collectors github.com/prometheus/client_golang/prometheus/internal