From 6ef82917ca368ab10bf5fe30d6a8a9a960f152ef Mon Sep 17 00:00:00 2001 From: Garrybest Date: Thu, 17 Nov 2022 23:18:50 +0800 Subject: [PATCH] refactor assign replicas strategy Signed-off-by: Garrybest --- pkg/scheduler/core/assignment.go | 64 +++++++++++++++++++++++++ pkg/scheduler/core/generic_scheduler.go | 51 ++++++++++---------- 2 files changed, 89 insertions(+), 26 deletions(-) create mode 100644 pkg/scheduler/core/assignment.go diff --git a/pkg/scheduler/core/assignment.go b/pkg/scheduler/core/assignment.go new file mode 100644 index 000000000..4d7c94400 --- /dev/null +++ b/pkg/scheduler/core/assignment.go @@ -0,0 +1,64 @@ +package core + +import ( + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" +) + +var ( + assignFuncMap = map[string]func(*assignState) ([]workv1alpha2.TargetCluster, error){ + DuplicatedStrategy: assignByDuplicatedStrategy, + AggregatedStrategy: assignByAggregatedStrategy, + StaticWeightStrategy: assignByStaticWeightStrategy, + DynamicWeightStrategy: assignByDynamicWeightStrategy, + } +) + +// assignState is a wrapper of the input for assigning function. +type assignState struct { + candidates []*clusterv1alpha1.Cluster + strategy *policyv1alpha1.ReplicaSchedulingStrategy + object *workv1alpha2.ResourceBindingSpec +} + +const ( + // DuplicatedStrategy indicates each candidate member cluster will directly apply the original replicas. + DuplicatedStrategy = "Duplicated" + // AggregatedStrategy indicates dividing replicas among clusters as few as possible and + // taking clusters' available replicas into consideration as well. + AggregatedStrategy = "Aggregated" + // StaticWeightStrategy indicates dividing replicas by static weight according to WeightPreference. + StaticWeightStrategy = "StaticWeight" + // DynamicWeightStrategy indicates dividing replicas by dynamic weight according to WeightPreference. + DynamicWeightStrategy = "DynamicWeight" +) + +// assignByDuplicatedStrategy assigns replicas by DuplicatedStrategy. +func assignByDuplicatedStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { + targetClusters := make([]workv1alpha2.TargetCluster, len(state.candidates)) + for i, cluster := range state.candidates { + targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name, Replicas: state.object.Replicas} + } + return targetClusters, nil +} + +// assignByAggregatedStrategy assigns replicas by AggregatedStrategy. +func assignByAggregatedStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { + return divideReplicasByResource(state.candidates, state.object, policyv1alpha1.ReplicaDivisionPreferenceAggregated) +} + +// assignByStaticWeightStrategy assigns replicas by StaticWeightStrategy. +func assignByStaticWeightStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { + // If ReplicaDivisionPreference is set to "Weighted" and WeightPreference is not set, + // scheduler will weight all clusters averagely. + if state.strategy.WeightPreference == nil { + state.strategy.WeightPreference = getDefaultWeightPreference(state.candidates) + } + return divideReplicasByStaticWeight(state.candidates, state.strategy.WeightPreference.StaticWeightList, state.object.Replicas) +} + +// assignByDynamicWeightStrategy assigns replicas by assignByDynamicWeightStrategy. +func assignByDynamicWeightStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { + return divideReplicasByDynamicWeight(state.candidates, state.strategy.WeightPreference.DynamicWeight, state.object) +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 9379ff100..15401d32d 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -173,46 +173,45 @@ func (g *genericScheduler) assignReplicas( ) ([]workv1alpha2.TargetCluster, error) { startTime := time.Now() defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime) + if len(clusters) == 0 { return nil, fmt.Errorf("no clusters available to schedule") } - targetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) if object.Replicas > 0 && replicaSchedulingStrategy != nil { + var strategy string + switch replicaSchedulingStrategy.ReplicaSchedulingType { - // 1. Duplicated Scheduling case policyv1alpha1.ReplicaSchedulingTypeDuplicated: - for i, cluster := range clusters { - targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name, Replicas: object.Replicas} - } - return targetClusters, nil - // 2. Divided Scheduling + strategy = DuplicatedStrategy case policyv1alpha1.ReplicaSchedulingTypeDivided: switch replicaSchedulingStrategy.ReplicaDivisionPreference { - // 2.1 Weighted Scheduling - case policyv1alpha1.ReplicaDivisionPreferenceWeighted: - // If ReplicaDivisionPreference is set to "Weighted" and WeightPreference is not set, - // scheduler will weight all clusters averagely. - if replicaSchedulingStrategy.WeightPreference == nil { - replicaSchedulingStrategy.WeightPreference = getDefaultWeightPreference(clusters) - } - // 2.1.1 Dynamic Weighted Scheduling (by resource) - if len(replicaSchedulingStrategy.WeightPreference.DynamicWeight) != 0 { - return divideReplicasByDynamicWeight(clusters, replicaSchedulingStrategy.WeightPreference.DynamicWeight, object) - } - // 2.1.2 Static Weighted Scheduling - return divideReplicasByStaticWeight(clusters, replicaSchedulingStrategy.WeightPreference.StaticWeightList, object.Replicas) - // 2.2 Aggregated scheduling (by resource) case policyv1alpha1.ReplicaDivisionPreferenceAggregated: - return divideReplicasByResource(clusters, object, policyv1alpha1.ReplicaDivisionPreferenceAggregated) - default: - return nil, fmt.Errorf("undefined replica division preference: %s", replicaSchedulingStrategy.ReplicaDivisionPreference) + strategy = AggregatedStrategy + case policyv1alpha1.ReplicaDivisionPreferenceWeighted: + if replicaSchedulingStrategy.WeightPreference != nil && len(replicaSchedulingStrategy.WeightPreference.DynamicWeight) != 0 { + strategy = DynamicWeightStrategy + } else { + strategy = StaticWeightStrategy + } } - default: - return nil, fmt.Errorf("undefined replica scheduling type: %s", replicaSchedulingStrategy.ReplicaSchedulingType) } + + assign, ok := assignFuncMap[strategy] + if !ok { + // should never happen at present + return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+ + "please try another scheduling strategy", replicaSchedulingStrategy.ReplicaSchedulingType, replicaSchedulingStrategy.ReplicaDivisionPreference) + } + return assign(&assignState{ + candidates: clusters, + strategy: replicaSchedulingStrategy, + object: object, + }) } + // If not workload, assign all clusters without considering replicas. + targetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) for i, cluster := range clusters { targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name} }