Merge pull request #3209 from XiShanYongYe-Chang/update-schedule-algorithm-interface
Add bindingStatus in the schedule algorithm interface
This commit is contained in:
commit
95c6c0971b
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
|
||||
type ScheduleAlgorithm interface {
|
||||
Schedule(context.Context, *workv1alpha2.ResourceBindingSpec, *ScheduleAlgorithmOption) (scheduleResult ScheduleResult, err error)
|
||||
Schedule(context.Context, *workv1alpha2.ResourceBindingSpec, *workv1alpha2.ResourceBindingStatus, *ScheduleAlgorithmOption) (scheduleResult ScheduleResult, err error)
|
||||
}
|
||||
|
||||
// ScheduleAlgorithmOption represents the option for ScheduleAlgorithm.
|
||||
|
@ -52,13 +52,18 @@ func NewGenericScheduler(
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (g *genericScheduler) Schedule(ctx context.Context, spec *workv1alpha2.ResourceBindingSpec, scheduleAlgorithmOption *ScheduleAlgorithmOption) (result ScheduleResult, err error) {
|
||||
func (g *genericScheduler) Schedule(
|
||||
ctx context.Context,
|
||||
spec *workv1alpha2.ResourceBindingSpec,
|
||||
status *workv1alpha2.ResourceBindingStatus,
|
||||
scheduleAlgorithmOption *ScheduleAlgorithmOption,
|
||||
) (result ScheduleResult, err error) {
|
||||
clusterInfoSnapshot := g.schedulerCache.Snapshot()
|
||||
if clusterInfoSnapshot.NumOfClusters() == 0 {
|
||||
return result, fmt.Errorf("no clusters available to schedule")
|
||||
}
|
||||
|
||||
feasibleClusters, diagnosis, err := g.findClustersThatFit(ctx, g.scheduleFramework, spec, &clusterInfoSnapshot)
|
||||
feasibleClusters, diagnosis, err := g.findClustersThatFit(ctx, spec, status, &clusterInfoSnapshot)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
|
||||
}
|
||||
|
@ -99,8 +104,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, spec *workv1alpha2.Reso
|
|||
// findClustersThatFit finds the clusters that are fit for the placement based on running the filter plugins.
|
||||
func (g *genericScheduler) findClustersThatFit(
|
||||
ctx context.Context,
|
||||
fwk framework.Framework,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
bindingStatus *workv1alpha2.ResourceBindingStatus,
|
||||
clusterInfo *cache.Snapshot,
|
||||
) ([]*clusterv1alpha1.Cluster, framework.Diagnosis, error) {
|
||||
startTime := time.Now()
|
||||
|
@ -114,7 +119,7 @@ func (g *genericScheduler) findClustersThatFit(
|
|||
// DO NOT filter unhealthy cluster, let users make decisions by using ClusterTolerations of Placement.
|
||||
clusters := clusterInfo.GetClusters()
|
||||
for _, c := range clusters {
|
||||
if result := fwk.RunFilterPlugins(ctx, bindingSpec, c.Cluster()); !result.IsSuccess() {
|
||||
if result := g.scheduleFramework.RunFilterPlugins(ctx, bindingSpec, bindingStatus, c.Cluster()); !result.IsSuccess() {
|
||||
klog.V(4).Infof("Cluster %q is not fit, reason: %v", c.Cluster().Name, result.AsError())
|
||||
diagnosis.ClusterToResultMap[c.Cluster().Name] = result
|
||||
} else {
|
||||
|
|
|
@ -25,7 +25,7 @@ type Framework interface {
|
|||
|
||||
// RunFilterPlugins runs the set of configured Filter plugins for resources on
|
||||
// the given cluster.
|
||||
RunFilterPlugins(ctx context.Context, bindingSpec *workv1alpha2.ResourceBindingSpec, clusterv1alpha1 *clusterv1alpha1.Cluster) *Result
|
||||
RunFilterPlugins(ctx context.Context, bindingSpec *workv1alpha2.ResourceBindingSpec, bindingStatus *workv1alpha2.ResourceBindingStatus, cluster *clusterv1alpha1.Cluster) *Result
|
||||
|
||||
// RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores
|
||||
RunScorePlugins(ctx context.Context, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (PluginToClusterScores, *Result)
|
||||
|
@ -41,7 +41,7 @@ type Plugin interface {
|
|||
type FilterPlugin interface {
|
||||
Plugin
|
||||
// Filter is called by the scheduling framework.
|
||||
Filter(ctx context.Context, bindingSpec *workv1alpha2.ResourceBindingSpec, clusterv1alpha1 *clusterv1alpha1.Cluster) *Result
|
||||
Filter(ctx context.Context, bindingSpec *workv1alpha2.ResourceBindingSpec, bindingStatus *workv1alpha2.ResourceBindingStatus, cluster *clusterv1alpha1.Cluster) *Result
|
||||
}
|
||||
|
||||
// Result indicates the result of running a plugin. It consists of a code, a
|
||||
|
|
|
@ -32,8 +32,12 @@ func (p *APIEnablement) Name() string {
|
|||
}
|
||||
|
||||
// Filter checks if the API(CRD) of the resource is enabled or installed in the target cluster.
|
||||
func (p *APIEnablement) Filter(ctx context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result {
|
||||
func (p *APIEnablement) Filter(
|
||||
_ context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
_ *workv1alpha2.ResourceBindingStatus,
|
||||
cluster *clusterv1alpha1.Cluster,
|
||||
) *framework.Result {
|
||||
if !helper.IsAPIEnabled(cluster.Status.APIEnablements, bindingSpec.Resource.APIVersion, bindingSpec.Resource.Kind) {
|
||||
klog.V(2).Infof("Cluster(%s) not fit as missing API(%s, kind=%s)", cluster.Name, bindingSpec.Resource.APIVersion, bindingSpec.Resource.Kind)
|
||||
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't have the API resource")
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/scheduler/framework"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
|
@ -31,9 +32,24 @@ func (p *ClusterAffinity) Name() string {
|
|||
}
|
||||
|
||||
// Filter checks if the cluster matched the placement cluster affinity constraint.
|
||||
func (p *ClusterAffinity) Filter(ctx context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result {
|
||||
affinity := bindingSpec.Placement.ClusterAffinity
|
||||
func (p *ClusterAffinity) Filter(
|
||||
_ context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
bindingStatus *workv1alpha2.ResourceBindingStatus,
|
||||
cluster *clusterv1alpha1.Cluster,
|
||||
) *framework.Result {
|
||||
var affinity *policyv1alpha1.ClusterAffinity
|
||||
if bindingSpec.Placement.ClusterAffinity != nil {
|
||||
affinity = bindingSpec.Placement.ClusterAffinity
|
||||
} else {
|
||||
for index, term := range bindingSpec.Placement.ClusterAffinities {
|
||||
if term.AffinityName == bindingStatus.SchedulerObservedAffinityName {
|
||||
affinity = &bindingSpec.Placement.ClusterAffinities[index].ClusterAffinity
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if affinity != nil {
|
||||
if util.ClusterMatches(cluster, *affinity) {
|
||||
return framework.NewResult(framework.Success)
|
||||
|
|
|
@ -30,8 +30,12 @@ func (p *SpreadConstraint) Name() string {
|
|||
}
|
||||
|
||||
// Filter checks if the cluster Provider/Zone/Region spread is null.
|
||||
func (p *SpreadConstraint) Filter(ctx context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result {
|
||||
func (p *SpreadConstraint) Filter(
|
||||
_ context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
_ *workv1alpha2.ResourceBindingStatus,
|
||||
cluster *clusterv1alpha1.Cluster,
|
||||
) *framework.Result {
|
||||
for _, spreadConstraint := range bindingSpec.Placement.SpreadConstraints {
|
||||
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldProvider && cluster.Spec.Provider == "" {
|
||||
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't have provider property")
|
||||
|
|
|
@ -33,8 +33,12 @@ func (p *TaintToleration) Name() string {
|
|||
}
|
||||
|
||||
// Filter checks if the given tolerations in placement tolerate cluster's taints.
|
||||
func (p *TaintToleration) Filter(ctx context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result {
|
||||
func (p *TaintToleration) Filter(
|
||||
_ context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
_ *workv1alpha2.ResourceBindingStatus,
|
||||
cluster *clusterv1alpha1.Cluster,
|
||||
) *framework.Result {
|
||||
// skip the filter if the cluster is already in the list of scheduling results,
|
||||
// if the workload referencing by the binding can't tolerate the taint,
|
||||
// the taint-manager will evict it after a graceful period.
|
||||
|
|
|
@ -74,29 +74,44 @@ func NewFramework(r Registry, opts ...Option) (framework.Framework, error) {
|
|||
|
||||
// RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster.
|
||||
// If any of the result is not success, the cluster is not suited for the resource.
|
||||
func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (result *framework.Result) {
|
||||
func (frw *frameworkImpl) RunFilterPlugins(
|
||||
ctx context.Context,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
bindingStatus *workv1alpha2.ResourceBindingStatus,
|
||||
cluster *clusterv1alpha1.Cluster,
|
||||
) (result *framework.Result) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, result.Code().String()).Observe(utilmetrics.DurationInSeconds(startTime))
|
||||
}()
|
||||
for _, p := range frw.filterPlugins {
|
||||
if result := frw.runFilterPlugin(ctx, p, bindingSpec, cluster); !result.IsSuccess() {
|
||||
if result := frw.runFilterPlugin(ctx, p, bindingSpec, bindingStatus, cluster); !result.IsSuccess() {
|
||||
return result
|
||||
}
|
||||
}
|
||||
return framework.NewResult(framework.Success)
|
||||
}
|
||||
|
||||
func (frw *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, bindingSpec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) *framework.Result {
|
||||
func (frw *frameworkImpl) runFilterPlugin(
|
||||
ctx context.Context,
|
||||
pl framework.FilterPlugin,
|
||||
bindingSpec *workv1alpha2.ResourceBindingSpec,
|
||||
bindingStatus *workv1alpha2.ResourceBindingStatus,
|
||||
cluster *clusterv1alpha1.Cluster,
|
||||
) *framework.Result {
|
||||
startTime := time.Now()
|
||||
result := pl.Filter(ctx, bindingSpec, cluster)
|
||||
result := pl.Filter(ctx, bindingSpec, bindingStatus, cluster)
|
||||
frw.metricsRecorder.observePluginDurationAsync(filter, pl.Name(), result, utilmetrics.DurationInSeconds(startTime))
|
||||
return result
|
||||
}
|
||||
|
||||
// RunScorePlugins runs the set of configured Filter plugins for resources on the cluster.
|
||||
// If any of the result is not success, the cluster is not suited for the resource.
|
||||
func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, spec *workv1alpha2.ResourceBindingSpec, clusters []*clusterv1alpha1.Cluster) (ps framework.PluginToClusterScores, result *framework.Result) {
|
||||
func (frw *frameworkImpl) RunScorePlugins(
|
||||
ctx context.Context,
|
||||
spec *workv1alpha2.ResourceBindingSpec,
|
||||
clusters []*clusterv1alpha1.Cluster,
|
||||
) (ps framework.PluginToClusterScores, result *framework.Result) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(score, result.Code().String()).Observe(utilmetrics.DurationInSeconds(startTime))
|
||||
|
|
|
@ -19,12 +19,12 @@ func Test_frameworkImpl_RunFilterPlugins(t *testing.T) {
|
|||
defer mockCtrl.Finish()
|
||||
|
||||
alwaysError := frameworktesting.NewMockFilterPlugin(mockCtrl)
|
||||
alwaysError.EXPECT().Filter(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
|
||||
alwaysError.EXPECT().Filter(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
|
||||
Return(framework.NewResult(framework.Error, "foo"))
|
||||
alwaysError.EXPECT().Name().AnyTimes().Return("foo")
|
||||
|
||||
alwaysSuccess := frameworktesting.NewMockFilterPlugin(mockCtrl)
|
||||
alwaysSuccess.EXPECT().Filter(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
|
||||
alwaysSuccess.EXPECT().Filter(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
|
||||
Return(framework.NewResult(framework.Success))
|
||||
alwaysSuccess.EXPECT().Name().AnyTimes().Return("foo")
|
||||
|
||||
|
@ -72,7 +72,7 @@ func Test_frameworkImpl_RunFilterPlugins(t *testing.T) {
|
|||
t.Errorf("create frame work error:%v", err)
|
||||
}
|
||||
|
||||
result := frameWork.RunFilterPlugins(ctx, nil, nil)
|
||||
result := frameWork.RunFilterPlugins(ctx, nil, nil, nil)
|
||||
if result.IsSuccess() != tt.isSuccess {
|
||||
t.Errorf("want %v, but get:%v", tt.isSuccess, result.IsSuccess())
|
||||
}
|
||||
|
|
|
@ -38,17 +38,17 @@ func (m *MockFramework) EXPECT() *MockFrameworkMockRecorder {
|
|||
}
|
||||
|
||||
// RunFilterPlugins mocks base method.
|
||||
func (m *MockFramework) RunFilterPlugins(ctx context.Context, bindingSpec *v1alpha2.ResourceBindingSpec, clusterv1alpha1 *v1alpha1.Cluster) *framework.Result {
|
||||
func (m *MockFramework) RunFilterPlugins(ctx context.Context, bindingSpec *v1alpha2.ResourceBindingSpec, bindingStatus *v1alpha2.ResourceBindingStatus, cluster *v1alpha1.Cluster) *framework.Result {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "RunFilterPlugins", ctx, bindingSpec, clusterv1alpha1)
|
||||
ret := m.ctrl.Call(m, "RunFilterPlugins", ctx, bindingSpec, bindingStatus, cluster)
|
||||
ret0, _ := ret[0].(*framework.Result)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// RunFilterPlugins indicates an expected call of RunFilterPlugins.
|
||||
func (mr *MockFrameworkMockRecorder) RunFilterPlugins(ctx, bindingSpec, clusterv1alpha1 interface{}) *gomock.Call {
|
||||
func (mr *MockFrameworkMockRecorder) RunFilterPlugins(ctx, bindingSpec, bindingStatus, cluster interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunFilterPlugins", reflect.TypeOf((*MockFramework)(nil).RunFilterPlugins), ctx, bindingSpec, clusterv1alpha1)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunFilterPlugins", reflect.TypeOf((*MockFramework)(nil).RunFilterPlugins), ctx, bindingSpec, bindingStatus, cluster)
|
||||
}
|
||||
|
||||
// RunScorePlugins mocks base method.
|
||||
|
@ -127,17 +127,17 @@ func (m *MockFilterPlugin) EXPECT() *MockFilterPluginMockRecorder {
|
|||
}
|
||||
|
||||
// Filter mocks base method.
|
||||
func (m *MockFilterPlugin) Filter(ctx context.Context, bindingSpec *v1alpha2.ResourceBindingSpec, clusterv1alpha1 *v1alpha1.Cluster) *framework.Result {
|
||||
func (m *MockFilterPlugin) Filter(ctx context.Context, bindingSpec *v1alpha2.ResourceBindingSpec, bindingStatus *v1alpha2.ResourceBindingStatus, cluster *v1alpha1.Cluster) *framework.Result {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Filter", ctx, bindingSpec, clusterv1alpha1)
|
||||
ret := m.ctrl.Call(m, "Filter", ctx, bindingSpec, bindingStatus, cluster)
|
||||
ret0, _ := ret[0].(*framework.Result)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Filter indicates an expected call of Filter.
|
||||
func (mr *MockFilterPluginMockRecorder) Filter(ctx, bindingSpec, clusterv1alpha1 interface{}) *gomock.Call {
|
||||
func (mr *MockFilterPluginMockRecorder) Filter(ctx, bindingSpec, bindingStatus, cluster interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockFilterPlugin)(nil).Filter), ctx, bindingSpec, clusterv1alpha1)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockFilterPlugin)(nil).Filter), ctx, bindingSpec, bindingStatus, cluster)
|
||||
}
|
||||
|
||||
// Name mocks base method.
|
||||
|
|
|
@ -460,7 +460,8 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
|
|||
klog.V(4).InfoS("Begin scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
|
||||
defer klog.V(4).InfoS("End scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
|
||||
|
||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &resourceBinding.Spec, &resourceBinding.Status,
|
||||
&core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
||||
var noClusterFit *framework.FitError
|
||||
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
|
||||
if err != nil && !errors.As(err, &noClusterFit) {
|
||||
|
@ -505,7 +506,8 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
|
|||
klog.V(4).InfoS("Begin scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
|
||||
defer klog.V(4).InfoS("End scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
|
||||
|
||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &clusterResourceBinding.Spec, &clusterResourceBinding.Status,
|
||||
&core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
||||
var noClusterFit *framework.FitError
|
||||
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
|
||||
if err != nil && !errors.As(err, &noClusterFit) {
|
||||
|
|
Loading…
Reference in New Issue