From 670cc154ebb81f80de8a6829d2dcf062d4a3e309 Mon Sep 17 00:00:00 2001 From: huone1 Date: Fri, 18 Mar 2022 11:05:57 +0800 Subject: [PATCH] extend the score process to support the normalizeScore and scoreWeight Signed-off-by: huone1 --- pkg/scheduler/core/generic_scheduler.go | 7 +++-- pkg/scheduler/framework/interface.go | 19 +++++++++--- .../plugins/apiinstalled/api_installed.go | 6 ---- .../clusteraffinity/cluster_affinity.go | 13 +++++++- .../tainttoleration/taint_toleration.go | 6 ---- pkg/scheduler/framework/runtime/framework.go | 31 +++++++++++++++---- 6 files changed, 56 insertions(+), 26 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 0c731c34a..2dc1f5f4c 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -58,7 +58,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph } klog.V(4).Infof("feasible clusters found: %v", feasibleClusters) - clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, placement, feasibleClusters) + clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, placement, spec, feasibleClusters) if err != nil { return result, fmt.Errorf("failed to prioritizeClusters: %v", err) } @@ -102,17 +102,18 @@ func (g *genericScheduler) prioritizeClusters( ctx context.Context, fwk framework.Framework, placement *policyv1alpha1.Placement, + spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (result framework.ClusterScoreList, err error) { defer metrics.ScheduleStep(metrics.ScheduleStepScore, time.Now()) - scoresMap, err := fwk.RunScorePlugins(ctx, placement, clusters) + scoresMap, err := fwk.RunScorePlugins(ctx, placement, spec, clusters) if err != nil { return result, err } result = make(framework.ClusterScoreList, len(clusters)) for i := range clusters { - result[i] = framework.ClusterScore{Name: clusters[i].Name, Score: 0} + result[i] = framework.ClusterScore{Cluster: clusters[i], Score: 0} for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 63bf5fbd6..aeb9aec55 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -19,7 +19,7 @@ type Framework interface { RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha2.ObjectReference, clusterv1alpha1 *clusterv1alpha1.Cluster) *Result // RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores - RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, error) + RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, error) } // Plugin is the parent type for all the scheduling framework plugins. @@ -125,13 +125,24 @@ type ScorePlugin interface { // Score is called on each filtered cluster. It must return success and an integer // indicating the rank of the cluster. All scoring plugins must return success or // the resource will be rejected. - Score(ctx context.Context, placement *policyv1alpha1.Placement, clusterv1alpha1 *clusterv1alpha1.Cluster) (float64, *Result) + Score(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (int64, *Result) + + // ScoreExtensions returns a ScoreExtensions interface + // if it implements one, or nil if does not. + ScoreExtensions() ScoreExtensions +} + +// ScoreExtensions is an interface for Score extended functionality. +type ScoreExtensions interface { + // NormalizeScore is called for all cluster scores produced + // by the same plugin's "Score" + NormalizeScore(ctx context.Context, scores ClusterScoreList) *Result } // ClusterScore represent the cluster score. type ClusterScore struct { - Name string - Score float64 + Cluster *clusterv1alpha1.Cluster + Score int64 } // ClusterScoreList declares a list of clusters and their scores. diff --git a/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go b/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go index 5d79afe83..0ed42d094 100644 --- a/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go +++ b/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go @@ -21,7 +21,6 @@ const ( type APIInstalled struct{} var _ framework.FilterPlugin = &APIInstalled{} -var _ framework.ScorePlugin = &APIInstalled{} // New instantiates the APIInstalled plugin. func New() framework.Plugin { @@ -42,8 +41,3 @@ func (p *APIInstalled) Filter(ctx context.Context, placement *policyv1alpha1.Pla return framework.NewResult(framework.Success) } - -// Score calculates the score on the candidate cluster. -func (p *APIInstalled) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) { - return 0, framework.NewResult(framework.Success) -} diff --git a/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go b/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go index cff483021..af1624b3c 100644 --- a/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go +++ b/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go @@ -46,6 +46,17 @@ func (p *ClusterAffinity) Filter(ctx context.Context, placement *policyv1alpha1. } // Score calculates the score on the candidate cluster. -func (p *ClusterAffinity) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) { +func (p *ClusterAffinity) Score(ctx context.Context, placement *policyv1alpha1.Placement, + spec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (int64, *framework.Result) { return 0, framework.NewResult(framework.Success) } + +// ScoreExtensions of the Score plugin. +func (p *ClusterAffinity) ScoreExtensions() framework.ScoreExtensions { + return p +} + +// NormalizeScore normalizes the score for each candidate cluster. +func (p *ClusterAffinity) NormalizeScore(ctx context.Context, scores framework.ClusterScoreList) *framework.Result { + return framework.NewResult(framework.Success) +} diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 8ecb9dfb4..9945032bd 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -22,7 +22,6 @@ const ( type TaintToleration struct{} var _ framework.FilterPlugin = &TaintToleration{} -var _ framework.ScorePlugin = &TaintToleration{} // New instantiates the TaintToleration plugin. func New() framework.Plugin { @@ -50,8 +49,3 @@ func (p *TaintToleration) Filter(ctx context.Context, placement *policyv1alpha1. return framework.NewResult(framework.Unschedulable, fmt.Sprintf("cluster had taint {%s: %s}, that the propagation policy didn't tolerate", taint.Key, taint.Value)) } - -// Score calculates the score on the candidate cluster. -func (p *TaintToleration) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) { - return 0, framework.NewResult(framework.Success) -} diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 433a73ba8..2bbc210f1 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -17,8 +17,9 @@ import ( // frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler // plugins. type frameworkImpl struct { - filterPlugins []framework.FilterPlugin - scorePlugins []framework.ScorePlugin + scorePluginsWeightMap map[string]int + filterPlugins []framework.FilterPlugin + scorePlugins []framework.ScorePlugin } var _ framework.Framework = &frameworkImpl{} @@ -59,20 +60,38 @@ func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *polic // RunScorePlugins runs the set of configured Filter plugins for resources on the cluster. // If any of the result is not success, the cluster is not suited for the resource. -func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) { +func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) { result := make(framework.PluginToClusterScores, len(frw.filterPlugins)) for _, p := range frw.scorePlugins { var scoreList framework.ClusterScoreList for _, cluster := range clusters { - score, res := p.Score(ctx, placement, cluster) + score, res := p.Score(ctx, placement, spec, cluster) if !res.IsSuccess() { return nil, fmt.Errorf("plugin %q failed with: %w", p.Name(), res.AsError()) } scoreList = append(scoreList, framework.ClusterScore{ - Name: cluster.Name, - Score: score, + Cluster: cluster, + Score: score, }) } + + if p.ScoreExtensions() != nil { + res := p.ScoreExtensions().NormalizeScore(ctx, scoreList) + if !res.IsSuccess() { + return nil, fmt.Errorf("plugin %q normalizeScore failed with: %w", p.Name(), res.AsError()) + } + } + + weight, ok := frw.scorePluginsWeightMap[p.Name()] + if !ok { + result[p.Name()] = scoreList + continue + } + + for i := range scoreList { + scoreList[i].Score = scoreList[i].Score * int64(weight) + } + result[p.Name()] = scoreList }