From b4803c42c9b2adad34678f6be6bdda8baba3079f Mon Sep 17 00:00:00 2001 From: Garrybest Date: Thu, 15 Dec 2022 18:57:45 +0800 Subject: [PATCH] refactor dynamic schedule Signed-off-by: Garrybest --- pkg/scheduler/core/assignment.go | 133 ++++++++++++--- pkg/scheduler/core/division_algorithm.go | 208 ++++++----------------- pkg/scheduler/core/generic_scheduler.go | 27 +-- pkg/scheduler/core/util.go | 48 +----- 4 files changed, 171 insertions(+), 245 deletions(-) diff --git a/pkg/scheduler/core/assignment.go b/pkg/scheduler/core/assignment.go index d8f7f0db7..0b7d75f29 100644 --- a/pkg/scheduler/core/assignment.go +++ b/pkg/scheduler/core/assignment.go @@ -1,27 +1,25 @@ package core import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/sets" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" ) var ( assignFuncMap = map[string]func(*assignState) ([]workv1alpha2.TargetCluster, error){ DuplicatedStrategy: assignByDuplicatedStrategy, - AggregatedStrategy: assignByAggregatedStrategy, + AggregatedStrategy: assignByDynamicStrategy, StaticWeightStrategy: assignByStaticWeightStrategy, - DynamicWeightStrategy: assignByDynamicWeightStrategy, + DynamicWeightStrategy: assignByDynamicStrategy, } ) -// assignState is a wrapper of the input for assigning function. -type assignState struct { - candidates []*clusterv1alpha1.Cluster - strategy *policyv1alpha1.ReplicaSchedulingStrategy - object *workv1alpha2.ResourceBindingSpec -} - const ( // DuplicatedStrategy indicates each candidate member cluster will directly apply the original replicas. DuplicatedStrategy = "Duplicated" @@ -34,20 +32,96 @@ const ( DynamicWeightStrategy = "DynamicWeight" ) +// assignState is a wrapper of the input for assigning function. +type assignState struct { + candidates []*clusterv1alpha1.Cluster + strategy *policyv1alpha1.ReplicaSchedulingStrategy + spec *workv1alpha2.ResourceBindingSpec + + // fields below are indirect results + strategyType string + + scheduledClusters []workv1alpha2.TargetCluster + assignedReplicas int32 + availableClusters []workv1alpha2.TargetCluster + availableReplicas int32 + + // targetReplicas is the replicas that we need to schedule in this round + targetReplicas int32 +} + +func newAssignState(candidates []*clusterv1alpha1.Cluster, strategy *policyv1alpha1.ReplicaSchedulingStrategy, obj *workv1alpha2.ResourceBindingSpec) *assignState { + var strategyType string + + switch strategy.ReplicaSchedulingType { + case policyv1alpha1.ReplicaSchedulingTypeDuplicated: + strategyType = DuplicatedStrategy + case policyv1alpha1.ReplicaSchedulingTypeDivided: + switch strategy.ReplicaDivisionPreference { + case policyv1alpha1.ReplicaDivisionPreferenceAggregated: + strategyType = AggregatedStrategy + case policyv1alpha1.ReplicaDivisionPreferenceWeighted: + if strategy.WeightPreference != nil && len(strategy.WeightPreference.DynamicWeight) != 0 { + strategyType = DynamicWeightStrategy + } else { + strategyType = StaticWeightStrategy + } + } + } + + return &assignState{candidates: candidates, strategy: strategy, spec: obj, strategyType: strategyType} +} + +func (as *assignState) buildScheduledClusters() { + as.scheduledClusters = as.spec.Clusters + as.assignedReplicas = util.GetSumOfReplicas(as.scheduledClusters) +} + +func (as *assignState) buildAvailableClusters(c calculator) { + as.availableClusters = c(as.candidates, as.spec) + as.availableReplicas = util.GetSumOfReplicas(as.availableClusters) +} + +// resortAvailableClusters is used to make sure scheduledClusters are at the front of availableClusters +// list so that we can assign new replicas to them preferentially when scale up. +func (as *assignState) resortAvailableClusters() []workv1alpha2.TargetCluster { + // get the previous scheduled clusters + prior := sets.NewString() + for _, cluster := range as.scheduledClusters { + if cluster.Replicas > 0 { + prior.Insert(cluster.Name) + } + } + + if len(prior) == 0 { + return as.availableClusters + } + + var ( + prev = make([]workv1alpha2.TargetCluster, 0, len(prior)) + left = make([]workv1alpha2.TargetCluster, 0, len(as.scheduledClusters)-len(prior)) + ) + + for _, cluster := range as.availableClusters { + if prior.Has(cluster.Name) { + prev = append(prev, cluster) + } else { + left = append(left, cluster) + } + } + as.availableClusters = append(prev, left...) + return as.availableClusters +} + // assignByDuplicatedStrategy assigns replicas by DuplicatedStrategy. func assignByDuplicatedStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { targetClusters := make([]workv1alpha2.TargetCluster, len(state.candidates)) for i, cluster := range state.candidates { - targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name, Replicas: state.object.Replicas} + targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name, Replicas: state.spec.Replicas} } return targetClusters, nil } -// assignByAggregatedStrategy assigns replicas by AggregatedStrategy. -func assignByAggregatedStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { - return divideReplicasByResource(state.candidates, state.object, policyv1alpha1.ReplicaDivisionPreferenceAggregated) -} - /* * assignByStaticWeightStrategy assigns a total number of replicas to the selected clusters by the weight list. * For example, we want to assign replicas to two clusters named A and B. @@ -66,13 +140,30 @@ func assignByStaticWeightStrategy(state *assignState) ([]workv1alpha2.TargetClus } weightList := getStaticWeightInfoList(state.candidates, state.strategy.WeightPreference.StaticWeightList) - acc := newDispenser(state.object.Replicas, nil) - acc.takeByWeight(weightList) + disp := newDispenser(state.spec.Replicas, nil) + disp.takeByWeight(weightList) - return acc.result, nil + return disp.result, nil } -// assignByDynamicWeightStrategy assigns replicas by assignByDynamicWeightStrategy. -func assignByDynamicWeightStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { - return divideReplicasByDynamicWeight(state.candidates, state.strategy.WeightPreference.DynamicWeight, state.object) +func assignByDynamicStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { + state.buildScheduledClusters() + if state.assignedReplicas > state.spec.Replicas { + // We need to reduce the replicas in terms of the previous result. + result, err := dynamicScaleDown(state) + if err != nil { + return nil, fmt.Errorf("failed to scale down: %v", err) + } + return result, nil + } else if state.assignedReplicas < state.spec.Replicas { + // We need to enlarge the replicas in terms of the previous result (if exists). + // First scheduling is considered as a special kind of scaling up. + result, err := dynamicScaleUp(state) + if err != nil { + return nil, fmt.Errorf("failed to scale up: %v", err) + } + return result, nil + } else { + return state.scheduledClusters, nil + } } diff --git a/pkg/scheduler/core/division_algorithm.go b/pkg/scheduler/core/division_algorithm.go index 66d1e997d..a5f252d83 100644 --- a/pkg/scheduler/core/division_algorithm.go +++ b/pkg/scheduler/core/division_algorithm.go @@ -4,8 +4,6 @@ import ( "fmt" "sort" - "k8s.io/apimachinery/pkg/util/sets" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -96,170 +94,72 @@ func getStaticWeightInfoList(clusters []*clusterv1alpha1.Cluster, weightList []p return list } -// divideReplicasByDynamicWeight assigns a total number of replicas to the selected clusters by the dynamic weight list. -func divideReplicasByDynamicWeight(clusters []*clusterv1alpha1.Cluster, dynamicWeight policyv1alpha1.DynamicWeightFactor, spec *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { - switch dynamicWeight { - case policyv1alpha1.DynamicWeightByAvailableReplicas: - return divideReplicasByResource(clusters, spec, policyv1alpha1.ReplicaDivisionPreferenceWeighted) - default: - return nil, fmt.Errorf("undefined replica dynamic weight factor: %s", dynamicWeight) +func getStaticWeightInfoListByTargetClusters(tcs []workv1alpha2.TargetCluster) helper.ClusterWeightInfoList { + weightList := make(helper.ClusterWeightInfoList, 0, len(tcs)) + for _, result := range tcs { + weightList = append(weightList, helper.ClusterWeightInfo{ + ClusterName: result.Name, + Weight: int64(result.Replicas), + }) } + return weightList } -func divideReplicasByResource( - clusters []*clusterv1alpha1.Cluster, - spec *workv1alpha2.ResourceBindingSpec, - preference policyv1alpha1.ReplicaDivisionPreference, -) ([]workv1alpha2.TargetCluster, error) { - // Step 1: Find the ready clusters that have old replicas - scheduledClusters := findOutScheduledCluster(spec.Clusters, clusters) - - // Step 2: calculate the assigned Replicas in scheduledClusters - assignedReplicas := util.GetSumOfReplicas(scheduledClusters) - - // Step 3: Check the scale type (up or down). - if assignedReplicas > spec.Replicas { - // We need to reduce the replicas in terms of the previous result. - newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference) - if err != nil { - return nil, fmt.Errorf("failed to scale down: %v", err) - } - - return newTargetClusters, nil - } else if assignedReplicas < spec.Replicas { - // We need to enlarge the replicas in terms of the previous result (if exists). - // First scheduling is considered as a special kind of scaling up. - newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference, scheduledClusters, assignedReplicas) - if err != nil { - return nil, fmt.Errorf("failed to scaleUp: %v", err) - } - return newTargetClusters, nil - } else { - return scheduledClusters, nil - } -} - -// divideReplicasByPreference assigns a total number of replicas to the selected clusters by preference according to the resource. -func divideReplicasByPreference( - clusterAvailableReplicas []workv1alpha2.TargetCluster, - replicas int32, - preference policyv1alpha1.ReplicaDivisionPreference, - scheduledClusterNames sets.String, -) ([]workv1alpha2.TargetCluster, error) { - clustersMaxReplicas := util.GetSumOfReplicas(clusterAvailableReplicas) - if clustersMaxReplicas < replicas { - return nil, fmt.Errorf("clusters resources are not enough to schedule, max %d replicas are support", clustersMaxReplicas) +// dynamicDivideReplicas assigns a total number of replicas to the selected clusters by preference according to the resource. +func dynamicDivideReplicas(state *assignState) ([]workv1alpha2.TargetCluster, error) { + if state.availableReplicas < state.targetReplicas { + return nil, fmt.Errorf("clusters resources are not enough to schedule, max %d replicas are support", state.availableReplicas) } - switch preference { - case policyv1alpha1.ReplicaDivisionPreferenceAggregated: - return divideReplicasByAggregation(clusterAvailableReplicas, replicas, scheduledClusterNames), nil - case policyv1alpha1.ReplicaDivisionPreferenceWeighted: - return divideReplicasByAvailableReplica(clusterAvailableReplicas, replicas, clustersMaxReplicas), nil - default: - return nil, fmt.Errorf("undefined replicaSchedulingType: %v", preference) - } -} - -func divideReplicasByAggregation(clusterAvailableReplicas []workv1alpha2.TargetCluster, - replicas int32, scheduledClusterNames sets.String) []workv1alpha2.TargetCluster { - clusterAvailableReplicas = resortClusterList(clusterAvailableReplicas, scheduledClusterNames) - clustersNum, clustersMaxReplicas := 0, int32(0) - for _, clusterInfo := range clusterAvailableReplicas { - clustersNum++ - clustersMaxReplicas += clusterInfo.Replicas - if clustersMaxReplicas >= replicas { - break - } - } - return divideReplicasByAvailableReplica(clusterAvailableReplicas[0:clustersNum], replicas, clustersMaxReplicas) -} - -func divideReplicasByAvailableReplica(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32, - clustersMaxReplicas int32) []workv1alpha2.TargetCluster { - desireReplicaInfos := make(map[string]int64) - allocatedReplicas := int32(0) - for _, clusterInfo := range clusterAvailableReplicas { - desireReplicaInfos[clusterInfo.Name] = int64(clusterInfo.Replicas * replicas / clustersMaxReplicas) - allocatedReplicas += int32(desireReplicaInfos[clusterInfo.Name]) - } - - var clusterNames []string - for _, targetCluster := range clusterAvailableReplicas { - clusterNames = append(clusterNames, targetCluster.Name) - } - divideRemainingReplicas(int(replicas-allocatedReplicas), desireReplicaInfos, clusterNames) - - targetClusters := make([]workv1alpha2.TargetCluster, len(desireReplicaInfos)) - i := 0 - for key, value := range desireReplicaInfos { - targetClusters[i] = workv1alpha2.TargetCluster{Name: key, Replicas: int32(value)} - i++ - } - return targetClusters -} - -// divideRemainingReplicas divide remaining Replicas to clusters and calculate desiredReplicaInfos -func divideRemainingReplicas(remainingReplicas int, desiredReplicaInfos map[string]int64, clusterNames []string) { - if remainingReplicas <= 0 { - return - } - - clusterSize := len(clusterNames) - if remainingReplicas < clusterSize { - for i := 0; i < remainingReplicas; i++ { - desiredReplicaInfos[clusterNames[i]]++ - } - } else { - avg, residue := remainingReplicas/clusterSize, remainingReplicas%clusterSize - for i := 0; i < clusterSize; i++ { - if i < residue { - desiredReplicaInfos[clusterNames[i]] += int64(avg) + 1 - } else { - desiredReplicaInfos[clusterNames[i]] += int64(avg) + switch state.strategyType { + case AggregatedStrategy: + state.availableClusters = state.resortAvailableClusters() + var sum int32 + for i := range state.availableClusters { + if sum += state.availableClusters[i].Replicas; sum >= state.targetReplicas { + state.availableClusters = state.availableClusters[:i+1] + break } } + fallthrough + case DynamicWeightStrategy: + // Set the availableClusters as the weight, scheduledClusters as init result, target as the dispenser object. + // After dispensing, the target cluster will be the combination of init result and weighted result for target replicas. + weightList := getStaticWeightInfoListByTargetClusters(state.availableClusters) + disp := newDispenser(state.targetReplicas, state.scheduledClusters) + disp.takeByWeight(weightList) + return disp.result, nil + default: + // should never happen + return nil, fmt.Errorf("undefined strategy type: %s", state.strategyType) } } -func scaleDownScheduleByReplicaDivisionPreference( - spec *workv1alpha2.ResourceBindingSpec, - preference policyv1alpha1.ReplicaDivisionPreference, -) ([]workv1alpha2.TargetCluster, error) { +func dynamicScaleDown(state *assignState) ([]workv1alpha2.TargetCluster, error) { // The previous scheduling result will be the weight reference of scaling down. // In other words, we scale down the replicas proportionally by their scheduled replicas. - return divideReplicasByPreference(spec.Clusters, spec.Replicas, preference, sets.NewString()) + // Now: + // 1. targetReplicas is set to desired replicas. + // 2. availableClusters is set to the former schedule result. + // 3. scheduledClusters and assignedReplicas are not set, which implicates we consider this action as a first schedule. + state.targetReplicas = state.spec.Replicas + state.scheduledClusters = nil + state.buildAvailableClusters(func(_ []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster { + availableClusters := make(TargetClustersList, len(spec.Clusters)) + copy(availableClusters, spec.Clusters) + sort.Sort(availableClusters) + return availableClusters + }) + return dynamicDivideReplicas(state) } -func scaleUpScheduleByReplicaDivisionPreference( - clusters []*clusterv1alpha1.Cluster, - spec *workv1alpha2.ResourceBindingSpec, - preference policyv1alpha1.ReplicaDivisionPreference, - scheduledClusters []workv1alpha2.TargetCluster, - assignedReplicas int32, -) ([]workv1alpha2.TargetCluster, error) { - // Step 1: Get how many replicas should be scheduled in this cycle and construct a new object if necessary - newSpec := spec - if assignedReplicas > 0 { - newSpec = spec.DeepCopy() - newSpec.Replicas = spec.Replicas - assignedReplicas - } - - // Step 2: Calculate available replicas of all candidates - clusterAvailableReplicas := calAvailableReplicas(clusters, newSpec) - sort.Sort(TargetClustersList(clusterAvailableReplicas)) - - // Step 3: Begin dividing. - // Only the new replicas are considered during this scheduler, the old replicas will not be moved. - // If not, the old replicas may be recreated which is not expected during scaling up. - // The parameter `scheduledClusterNames` is used to make sure that we assign new replicas to them preferentially - // so that all the replicas are aggregated. - result, err := divideReplicasByPreference(clusterAvailableReplicas, newSpec.Replicas, - preference, util.ConvertToClusterNames(scheduledClusters)) - if err != nil { - return result, err - } - - // Step 4: Merge the result of previous and new results. - return util.MergeTargetClusters(scheduledClusters, result), nil +func dynamicScaleUp(state *assignState) ([]workv1alpha2.TargetCluster, error) { + // Target is the extra ones. + state.targetReplicas = state.spec.Replicas - state.assignedReplicas + state.buildAvailableClusters(func(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster { + clusterAvailableReplicas := calAvailableReplicas(clusters, spec) + sort.Sort(TargetClustersList(clusterAvailableReplicas)) + return clusterAvailableReplicas + }) + return dynamicDivideReplicas(state) } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 15401d32d..e2bf93fc5 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -179,35 +179,14 @@ func (g *genericScheduler) assignReplicas( } if object.Replicas > 0 && replicaSchedulingStrategy != nil { - var strategy string - - switch replicaSchedulingStrategy.ReplicaSchedulingType { - case policyv1alpha1.ReplicaSchedulingTypeDuplicated: - strategy = DuplicatedStrategy - case policyv1alpha1.ReplicaSchedulingTypeDivided: - switch replicaSchedulingStrategy.ReplicaDivisionPreference { - case policyv1alpha1.ReplicaDivisionPreferenceAggregated: - strategy = AggregatedStrategy - case policyv1alpha1.ReplicaDivisionPreferenceWeighted: - if replicaSchedulingStrategy.WeightPreference != nil && len(replicaSchedulingStrategy.WeightPreference.DynamicWeight) != 0 { - strategy = DynamicWeightStrategy - } else { - strategy = StaticWeightStrategy - } - } - } - - assign, ok := assignFuncMap[strategy] + state := newAssignState(clusters, replicaSchedulingStrategy, object) + assignFunc, ok := assignFuncMap[state.strategyType] if !ok { // should never happen at present return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+ "please try another scheduling strategy", replicaSchedulingStrategy.ReplicaSchedulingType, replicaSchedulingStrategy.ReplicaDivisionPreference) } - return assign(&assignState{ - candidates: clusters, - strategy: replicaSchedulingStrategy, - object: object, - }) + return assignFunc(state) } // If not workload, assign all clusters without considering replicas. diff --git a/pkg/scheduler/core/util.go b/pkg/scheduler/core/util.go index d925f8005..e35307dc8 100644 --- a/pkg/scheduler/core/util.go +++ b/pkg/scheduler/core/util.go @@ -15,6 +15,8 @@ import ( "github.com/karmada-io/karmada/pkg/util" ) +type calculator func([]*clusterv1alpha1.Cluster, *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster + func getDefaultWeightPreference(clusters []*clusterv1alpha1.Cluster) *policyv1alpha1.ClusterPreferences { staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0) for _, cluster := range clusters { @@ -73,52 +75,6 @@ func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha return availableTargetClusters } -// findOutScheduledCluster will return a slice of clusters -// which are a part of `TargetClusters` and have non-zero replicas. -func findOutScheduledCluster(tcs []workv1alpha2.TargetCluster, candidates []*clusterv1alpha1.Cluster) []workv1alpha2.TargetCluster { - validTarget := make([]workv1alpha2.TargetCluster, 0) - if len(tcs) == 0 { - return validTarget - } - - for _, targetCluster := range tcs { - // must have non-zero replicas - if targetCluster.Replicas <= 0 { - continue - } - // must in `candidates` - for _, cluster := range candidates { - if targetCluster.Name == cluster.Name { - validTarget = append(validTarget, targetCluster) - break - } - } - } - - return validTarget -} - -// resortClusterList is used to make sure scheduledClusterNames are in front of the other clusters in the list of -// clusterAvailableReplicas so that we can assign new replicas to them preferentially when scale up. -// Note that scheduledClusterNames have none items during first scheduler -func resortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, scheduledClusterNames sets.String) []workv1alpha2.TargetCluster { - if scheduledClusterNames.Len() == 0 { - return clusterAvailableReplicas - } - var preUsedCluster []workv1alpha2.TargetCluster - var unUsedCluster []workv1alpha2.TargetCluster - for i := range clusterAvailableReplicas { - if scheduledClusterNames.Has(clusterAvailableReplicas[i].Name) { - preUsedCluster = append(preUsedCluster, clusterAvailableReplicas[i]) - } else { - unUsedCluster = append(unUsedCluster, clusterAvailableReplicas[i]) - } - } - clusterAvailableReplicas = append(preUsedCluster, unUsedCluster...) - klog.V(4).Infof("Resorted target cluster: %v", clusterAvailableReplicas) - return clusterAvailableReplicas -} - // attachZeroReplicasCluster attach cluster in clusters into targetCluster // The purpose is to avoid workload not appeared in rb's spec.clusters field func attachZeroReplicasCluster(clusters []*clusterv1alpha1.Cluster, targetClusters []workv1alpha2.TargetCluster) []workv1alpha2.TargetCluster {