From 5836639f69e16f9b9c5bf9970059df3be0084a5b Mon Sep 17 00:00:00 2001 From: Poor12 Date: Wed, 29 Jun 2022 14:51:56 +0800 Subject: [PATCH] add metrics_recorder for scheduler framework Signed-off-by: Poor12 --- pkg/scheduler/core/generic_scheduler.go | 18 ++-- pkg/scheduler/framework/interface.go | 26 +++++- pkg/scheduler/framework/runtime/framework.go | 88 ++++++++++++++++--- .../framework/runtime/metrics_recorder.go | 87 ++++++++++++++++++ pkg/scheduler/metrics/metrics.go | 23 +++++ 5 files changed, 220 insertions(+), 22 deletions(-) create mode 100644 pkg/scheduler/framework/runtime/metrics_recorder.go diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 6462806d9..9379ff100 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -103,7 +103,8 @@ func (g *genericScheduler) findClustersThatFit( bindingSpec *workv1alpha2.ResourceBindingSpec, clusterInfo *cache.Snapshot, ) ([]*clusterv1alpha1.Cluster, framework.Diagnosis, error) { - defer metrics.ScheduleStep(metrics.ScheduleStepFilter, time.Now()) + startTime := time.Now() + defer metrics.ScheduleStep(metrics.ScheduleStepFilter, startTime) diagnosis := framework.Diagnosis{ ClusterToResultMap: make(framework.ClusterToResultMap), @@ -131,11 +132,12 @@ func (g *genericScheduler) prioritizeClusters( placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (result framework.ClusterScoreList, err error) { - defer metrics.ScheduleStep(metrics.ScheduleStepScore, time.Now()) + startTime := time.Now() + defer metrics.ScheduleStep(metrics.ScheduleStepScore, startTime) - scoresMap, err := fwk.RunScorePlugins(ctx, placement, spec, clusters) - if err != nil { - return result, err + scoresMap, runScorePluginsResult := fwk.RunScorePlugins(ctx, placement, spec, clusters) + if runScorePluginsResult != nil { + return result, runScorePluginsResult.AsError() } if klog.V(4).Enabled() { @@ -157,7 +159,8 @@ func (g *genericScheduler) prioritizeClusters( func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) { - defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now()) + startTime := time.Now() + defer metrics.ScheduleStep(metrics.ScheduleStepSelect, startTime) groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec, calAvailableReplicas) return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas) @@ -168,7 +171,8 @@ func (g *genericScheduler) assignReplicas( replicaSchedulingStrategy *policyv1alpha1.ReplicaSchedulingStrategy, object *workv1alpha2.ResourceBindingSpec, ) ([]workv1alpha2.TargetCluster, error) { - defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, time.Now()) + startTime := time.Now() + defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime) if len(clusters) == 0 { return nil, fmt.Errorf("no clusters available to schedule") } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index be88b1779..f4470fe6a 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -27,7 +27,7 @@ type Framework interface { RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, bindingSpec *workv1alpha2.ResourceBindingSpec, clusterv1alpha1 *clusterv1alpha1.Cluster) *Result // RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores - RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, error) + RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, *Result) } // Plugin is the parent type for all the scheduling framework plugins. @@ -67,6 +67,13 @@ const ( Error ) +// This list should be exactly the same as the codes iota defined above in the same order. +var codes = []string{"Success", "Unschedulable", "Error"} + +func (c Code) String() string { + return codes[c] +} + // NewResult makes a result out of the given arguments and returns its pointer. func NewResult(code Code, reasons ...string) *Result { s := &Result{ @@ -131,6 +138,23 @@ func (s *Result) Reasons() []string { return s.reasons } +// Code returns code of the Result. +func (s *Result) Code() Code { + if s == nil { + return Success + } + return s.code +} + +// AsResult wraps an error in a Result. +func AsResult(err error) *Result { + return &Result{ + code: Error, + reasons: []string{err.Error()}, + err: err, + } +} + // ScorePlugin is an interface that must be implemented by "Score" plugins to rank // clusters that passed the filtering phase. type ScorePlugin interface { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 6e4459e51..8e48da10e 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -4,11 +4,20 @@ import ( "context" "fmt" "reflect" + "time" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/scheduler/framework" + "github.com/karmada-io/karmada/pkg/scheduler/metrics" + utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics" +) + +const ( + filter = "Filter" + score = "Score" + scoreExtensionNormalize = "ScoreExtensionNormalize" ) // frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler @@ -17,13 +26,35 @@ type frameworkImpl struct { scorePluginsWeightMap map[string]int filterPlugins []framework.FilterPlugin scorePlugins []framework.ScorePlugin + + metricsRecorder *metricsRecorder } var _ framework.Framework = &frameworkImpl{} +type frameworkOptions struct { + metricsRecorder *metricsRecorder +} + +// Option for the frameworkImpl. +type Option func(*frameworkOptions) + +func defaultFrameworkOptions() frameworkOptions { + return frameworkOptions{ + metricsRecorder: newMetricsRecorder(1000, time.Second), + } +} + // NewFramework creates a scheduling framework by registry. -func NewFramework(r Registry) (framework.Framework, error) { - f := &frameworkImpl{} +func NewFramework(r Registry, opts ...Option) (framework.Framework, error) { + options := defaultFrameworkOptions() + for _, opt := range opts { + opt(&options) + } + + f := &frameworkImpl{ + metricsRecorder: options.metricsRecorder, + } filterPluginsList := reflect.ValueOf(&f.filterPlugins).Elem() scorePluginsList := reflect.ValueOf(&f.scorePlugins).Elem() filterType := filterPluginsList.Type().Elem() @@ -44,42 +75,57 @@ func NewFramework(r Registry) (framework.Framework, error) { // RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster. // If any of the result is not success, the cluster is not suited for the resource. -func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result { +func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (result *framework.Result) { + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, result.Code().String()).Observe(utilmetrics.DurationInSeconds(startTime)) + }() for _, p := range frw.filterPlugins { - if result := p.Filter(ctx, placement, bindingSpec, cluster); !result.IsSuccess() { + if result := frw.runFilterPlugin(ctx, p, placement, bindingSpec, cluster); !result.IsSuccess() { return result } } return framework.NewResult(framework.Success) } +func (frw *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, placement *policyv1alpha1.Placement, bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result { + startTime := time.Now() + result := pl.Filter(ctx, placement, bindingSpec, cluster) + frw.metricsRecorder.observePluginDurationAsync(filter, pl.Name(), result, utilmetrics.DurationInSeconds(startTime)) + return result +} + // RunScorePlugins runs the set of configured Filter plugins for resources on the cluster. // If any of the result is not success, the cluster is not suited for the resource. -func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) { - result := make(framework.PluginToClusterScores, len(frw.filterPlugins)) +func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (ps framework.PluginToClusterScores, result *framework.Result) { + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(score, result.Code().String()).Observe(utilmetrics.DurationInSeconds(startTime)) + }() + pluginToClusterScores := make(framework.PluginToClusterScores, len(frw.filterPlugins)) for _, p := range frw.scorePlugins { var scoreList framework.ClusterScoreList for _, cluster := range clusters { - score, res := p.Score(ctx, placement, spec, cluster) + s, res := frw.runScorePlugin(ctx, p, placement, spec, cluster) if !res.IsSuccess() { - return nil, fmt.Errorf("plugin %q failed with: %w", p.Name(), res.AsError()) + return nil, framework.AsResult(fmt.Errorf("plugin %q failed with: %w", p.Name(), res.AsError())) } scoreList = append(scoreList, framework.ClusterScore{ Cluster: cluster, - Score: score, + Score: s, }) } if p.ScoreExtensions() != nil { - res := p.ScoreExtensions().NormalizeScore(ctx, scoreList) + res := frw.runScoreExtension(ctx, p, scoreList) if !res.IsSuccess() { - return nil, fmt.Errorf("plugin %q normalizeScore failed with: %w", p.Name(), res.AsError()) + return nil, framework.AsResult(fmt.Errorf("plugin %q normalizeScore failed with: %w", p.Name(), res.AsError())) } } weight, ok := frw.scorePluginsWeightMap[p.Name()] if !ok { - result[p.Name()] = scoreList + pluginToClusterScores[p.Name()] = scoreList continue } @@ -87,10 +133,24 @@ func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policy scoreList[i].Score = scoreList[i].Score * int64(weight) } - result[p.Name()] = scoreList + pluginToClusterScores[p.Name()] = scoreList } - return result, nil + return pluginToClusterScores, nil +} + +func (frw *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (int64, *framework.Result) { + startTime := time.Now() + s, result := pl.Score(ctx, placement, spec, cluster) + frw.metricsRecorder.observePluginDurationAsync(score, pl.Name(), result, utilmetrics.DurationInSeconds(startTime)) + return s, result +} + +func (frw *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.ScorePlugin, scores framework.ClusterScoreList) *framework.Result { + startTime := time.Now() + result := pl.ScoreExtensions().NormalizeScore(ctx, scores) + frw.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), result, utilmetrics.DurationInSeconds(startTime)) + return result } func addPluginToList(plugin framework.Plugin, pluginType reflect.Type, pluginList *reflect.Value) { diff --git a/pkg/scheduler/framework/runtime/metrics_recorder.go b/pkg/scheduler/framework/runtime/metrics_recorder.go new file mode 100644 index 000000000..f604fa9d7 --- /dev/null +++ b/pkg/scheduler/framework/runtime/metrics_recorder.go @@ -0,0 +1,87 @@ +package runtime + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/karmada-io/karmada/pkg/scheduler/framework" + "github.com/karmada-io/karmada/pkg/scheduler/metrics" +) + +// frameworkMetric is the data structure passed in the buffer channel between the main framework thread +// and the metricsRecorder goroutine. +type frameworkMetric struct { + metric *prometheus.HistogramVec + labelValues []string + value float64 +} + +// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path. +type metricsRecorder struct { + // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. + bufferCh chan *frameworkMetric + // if bufferSize is reached, incoming metrics will be discarded. + bufferSize int + // how often the recorder runs to flush the metrics. + interval time.Duration + + // stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only + // used in tests. + stopCh chan struct{} + // isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure + // the metric flushing goroutine is stopped so that tests can collect metrics for verification. + isStoppedCh chan struct{} +} + +func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder { + recorder := &metricsRecorder{ + bufferCh: make(chan *frameworkMetric, bufferSize), + bufferSize: bufferSize, + interval: interval, + stopCh: make(chan struct{}), + isStoppedCh: make(chan struct{}), + } + go recorder.run() + return recorder +} + +// observePluginDurationAsync observes the plugin_execution_duration_seconds metric. +// The metric will be flushed to Prometheus asynchronously. +func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, result *framework.Result, value float64) { + newMetric := &frameworkMetric{ + metric: metrics.PluginExecutionDuration, + labelValues: []string{pluginName, extensionPoint, result.Code().String()}, + value: value, + } + select { + case r.bufferCh <- newMetric: + default: + } +} + +// run flushes buffered metrics into Prometheus every second. +func (r *metricsRecorder) run() { + for { + select { + case <-r.stopCh: + close(r.isStoppedCh) + return + default: + } + r.flushMetrics() + time.Sleep(r.interval) + } +} + +// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics. +func (r *metricsRecorder) flushMetrics() { + for i := 0; i < r.bufferSize; i++ { + select { + case m := <-r.bufferCh: + m.metric.WithLabelValues(m.labelValues...).Observe(m.value) + default: + return + } + } +} diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index ab7687186..8facdd5d7 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -72,6 +72,29 @@ var ( Name: "queue_incoming_bindings_total", Help: "Number of bindings added to scheduling queues by event type.", }, []string{"event"}) + + // FrameworkExtensionPointDuration is the metrics which indicates the latency for running all plugins of a specific extension point. + FrameworkExtensionPointDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "framework_extension_point_duration_seconds", + Help: "Latency for running all plugins of a specific extension point.", + // Start with 0.1ms with the last bucket being [~200ms, Inf) + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 12), + }, + []string{"extension_point", "result"}) + + // PluginExecutionDuration is the metrics which indicates the duration for running a plugin at a specific extension point. + PluginExecutionDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "plugin_execution_duration_seconds", + Help: "Duration for running a plugin at a specific extension point.", + // Start with 0.01ms with the last bucket being [~22ms, Inf). We use a small factor (1.5) + // so that we have better granularity since plugin latency is very sensitive. + Buckets: prometheus.ExponentialBuckets(0.00001, 1.5, 20), + }, + []string{"plugin", "extension_point", "result"}) ) // BindingSchedule can record a scheduling attempt and the duration