Merge pull request #1500 from huone1/extend-score

extend the score process to support the normalizeScore and scoreWeight
This commit is contained in:
karmada-bot 2022-03-18 16:56:38 +08:00 committed by GitHub
commit 2bc1721f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 26 deletions

View File

@ -58,7 +58,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
}
klog.V(4).Infof("feasible clusters found: %v", feasibleClusters)
clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, placement, feasibleClusters)
clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, placement, spec, feasibleClusters)
if err != nil {
return result, fmt.Errorf("failed to prioritizeClusters: %v", err)
}
@ -102,17 +102,18 @@ func (g *genericScheduler) prioritizeClusters(
ctx context.Context,
fwk framework.Framework,
placement *policyv1alpha1.Placement,
spec *workv1alpha2.ResourceBindingSpec,
clusters []*clusterv1alpha1.Cluster) (result framework.ClusterScoreList, err error) {
defer metrics.ScheduleStep(metrics.ScheduleStepScore, time.Now())
scoresMap, err := fwk.RunScorePlugins(ctx, placement, clusters)
scoresMap, err := fwk.RunScorePlugins(ctx, placement, spec, clusters)
if err != nil {
return result, err
}
result = make(framework.ClusterScoreList, len(clusters))
for i := range clusters {
result[i] = framework.ClusterScore{Name: clusters[i].Name, Score: 0}
result[i] = framework.ClusterScore{Cluster: clusters[i], Score: 0}
for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score
}

View File

@ -19,7 +19,7 @@ type Framework interface {
RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha2.ObjectReference, clusterv1alpha1 *clusterv1alpha1.Cluster) *Result
// RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores
RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, error)
RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, error)
}
// Plugin is the parent type for all the scheduling framework plugins.
@ -125,13 +125,24 @@ type ScorePlugin interface {
// Score is called on each filtered cluster. It must return success and an integer
// indicating the rank of the cluster. All scoring plugins must return success or
// the resource will be rejected.
Score(ctx context.Context, placement *policyv1alpha1.Placement, clusterv1alpha1 *clusterv1alpha1.Cluster) (float64, *Result)
Score(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (int64, *Result)
// ScoreExtensions returns a ScoreExtensions interface
// if it implements one, or nil if does not.
ScoreExtensions() ScoreExtensions
}
// ScoreExtensions is an interface for Score extended functionality.
type ScoreExtensions interface {
// NormalizeScore is called for all cluster scores produced
// by the same plugin's "Score"
NormalizeScore(ctx context.Context, scores ClusterScoreList) *Result
}
// ClusterScore represent the cluster score.
type ClusterScore struct {
Name string
Score float64
Cluster *clusterv1alpha1.Cluster
Score int64
}
// ClusterScoreList declares a list of clusters and their scores.

View File

@ -21,7 +21,6 @@ const (
type APIInstalled struct{}
var _ framework.FilterPlugin = &APIInstalled{}
var _ framework.ScorePlugin = &APIInstalled{}
// New instantiates the APIInstalled plugin.
func New() framework.Plugin {
@ -42,8 +41,3 @@ func (p *APIInstalled) Filter(ctx context.Context, placement *policyv1alpha1.Pla
return framework.NewResult(framework.Success)
}
// Score calculates the score on the candidate cluster.
func (p *APIInstalled) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) {
return 0, framework.NewResult(framework.Success)
}

View File

@ -46,6 +46,17 @@ func (p *ClusterAffinity) Filter(ctx context.Context, placement *policyv1alpha1.
}
// Score calculates the score on the candidate cluster.
func (p *ClusterAffinity) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) {
func (p *ClusterAffinity) Score(ctx context.Context, placement *policyv1alpha1.Placement,
spec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (int64, *framework.Result) {
return 0, framework.NewResult(framework.Success)
}
// ScoreExtensions of the Score plugin.
func (p *ClusterAffinity) ScoreExtensions() framework.ScoreExtensions {
return p
}
// NormalizeScore normalizes the score for each candidate cluster.
func (p *ClusterAffinity) NormalizeScore(ctx context.Context, scores framework.ClusterScoreList) *framework.Result {
return framework.NewResult(framework.Success)
}

View File

@ -22,7 +22,6 @@ const (
type TaintToleration struct{}
var _ framework.FilterPlugin = &TaintToleration{}
var _ framework.ScorePlugin = &TaintToleration{}
// New instantiates the TaintToleration plugin.
func New() framework.Plugin {
@ -50,8 +49,3 @@ func (p *TaintToleration) Filter(ctx context.Context, placement *policyv1alpha1.
return framework.NewResult(framework.Unschedulable, fmt.Sprintf("cluster had taint {%s: %s}, that the propagation policy didn't tolerate",
taint.Key, taint.Value))
}
// Score calculates the score on the candidate cluster.
func (p *TaintToleration) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) {
return 0, framework.NewResult(framework.Success)
}

View File

@ -17,8 +17,9 @@ import (
// frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
filterPlugins []framework.FilterPlugin
scorePlugins []framework.ScorePlugin
scorePluginsWeightMap map[string]int
filterPlugins []framework.FilterPlugin
scorePlugins []framework.ScorePlugin
}
var _ framework.Framework = &frameworkImpl{}
@ -59,20 +60,38 @@ func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *polic
// RunScorePlugins runs the set of configured Filter plugins for resources on the cluster.
// If any of the result is not success, the cluster is not suited for the resource.
func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) {
func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) {
result := make(framework.PluginToClusterScores, len(frw.filterPlugins))
for _, p := range frw.scorePlugins {
var scoreList framework.ClusterScoreList
for _, cluster := range clusters {
score, res := p.Score(ctx, placement, cluster)
score, res := p.Score(ctx, placement, spec, cluster)
if !res.IsSuccess() {
return nil, fmt.Errorf("plugin %q failed with: %w", p.Name(), res.AsError())
}
scoreList = append(scoreList, framework.ClusterScore{
Name: cluster.Name,
Score: score,
Cluster: cluster,
Score: score,
})
}
if p.ScoreExtensions() != nil {
res := p.ScoreExtensions().NormalizeScore(ctx, scoreList)
if !res.IsSuccess() {
return nil, fmt.Errorf("plugin %q normalizeScore failed with: %w", p.Name(), res.AsError())
}
}
weight, ok := frw.scorePluginsWeightMap[p.Name()]
if !ok {
result[p.Name()] = scoreList
continue
}
for i := range scoreList {
scoreList[i].Score = scoreList[i].Score * int64(weight)
}
result[p.Name()] = scoreList
}