From eb0dac3a68435d5522d6307bdec2c4887921fa2a Mon Sep 17 00:00:00 2001 From: Garrybest Date: Tue, 30 Nov 2021 20:12:54 +0800 Subject: [PATCH] merge scale scheduling with normal scheduling Signed-off-by: Garrybest --- pkg/scheduler/core/division_algorithm.go | 131 +++++++++++++---------- pkg/scheduler/core/util.go | 39 ++++--- 2 files changed, 97 insertions(+), 73 deletions(-) diff --git a/pkg/scheduler/core/division_algorithm.go b/pkg/scheduler/core/division_algorithm.go index 722bf372a..ef0eb7f95 100644 --- a/pkg/scheduler/core/division_algorithm.go +++ b/pkg/scheduler/core/division_algorithm.go @@ -4,6 +4,8 @@ import ( "fmt" "sort" + "k8s.io/apimachinery/pkg/util/sets" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -28,11 +30,33 @@ func divideReplicasByDynamicWeight(clusters []*clusterv1alpha1.Cluster, dynamicW } } -func divideReplicasByResource(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec, - preference policyv1alpha1.ReplicaDivisionPreference, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) { - clusterAvailableReplicas := calAvailableReplicas(clusters, spec) - sort.Sort(TargetClustersList(clusterAvailableReplicas)) - return divideReplicasByPreference(clusterAvailableReplicas, spec.Replicas, preference, preUsedClustersName...) +func divideReplicasByResource( + clusters []*clusterv1alpha1.Cluster, + spec *workv1alpha2.ResourceBindingSpec, + preference policyv1alpha1.ReplicaDivisionPreference, +) ([]workv1alpha2.TargetCluster, error) { + // Step 1: Get previous total sum of replicas. + assignedReplicas := util.GetSumOfReplicas(spec.Clusters) + + // Step 2: Check the scale type (up or down). + if assignedReplicas > spec.Replicas { + // We need to reduce the replicas in terms of the previous result. + newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference) + if err != nil { + return nil, fmt.Errorf("failed to scale down: %v", err) + } + return newTargetClusters, nil + } else if assignedReplicas < spec.Replicas { + // We need to enlarge the replicas in terms of the previous result (if exists). + // First scheduling is considered as a special kind of scaling up. + newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference, assignedReplicas) + if err != nil { + return nil, fmt.Errorf("failed to scaleUp: %v", err) + } + return newTargetClusters, nil + } else { + return spec.Clusters, nil + } } // divideReplicasByStaticWeight assigns a total number of replicas to the selected clusters by the weight list. @@ -90,8 +114,12 @@ func divideReplicasByStaticWeight(clusters []*clusterv1alpha1.Cluster, weightLis } // divideReplicasByPreference assigns a total number of replicas to the selected clusters by preference according to the resource. -func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32, - preference policyv1alpha1.ReplicaDivisionPreference, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) { +func divideReplicasByPreference( + clusterAvailableReplicas []workv1alpha2.TargetCluster, + replicas int32, + preference policyv1alpha1.ReplicaDivisionPreference, + scheduledClusterNames sets.String, +) ([]workv1alpha2.TargetCluster, error) { clustersMaxReplicas := util.GetSumOfReplicas(clusterAvailableReplicas) if clustersMaxReplicas < replicas { return nil, fmt.Errorf("clusters resources are not enough to schedule, max %d replicas are support", clustersMaxReplicas) @@ -99,7 +127,7 @@ func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCl switch preference { case policyv1alpha1.ReplicaDivisionPreferenceAggregated: - return divideReplicasByAggregation(clusterAvailableReplicas, replicas, preUsedClustersName...), nil + return divideReplicasByAggregation(clusterAvailableReplicas, replicas, scheduledClusterNames), nil case policyv1alpha1.ReplicaDivisionPreferenceWeighted: return divideReplicasByAvailableReplica(clusterAvailableReplicas, replicas, clustersMaxReplicas), nil default: @@ -108,8 +136,8 @@ func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCl } func divideReplicasByAggregation(clusterAvailableReplicas []workv1alpha2.TargetCluster, - replicas int32, preUsedClustersName ...string) []workv1alpha2.TargetCluster { - clusterAvailableReplicas = presortClusterList(clusterAvailableReplicas, preUsedClustersName...) + replicas int32, scheduledClusterNames sets.String) []workv1alpha2.TargetCluster { + clusterAvailableReplicas = resortClusterList(clusterAvailableReplicas, scheduledClusterNames) clustersNum, clustersMaxReplicas := 0, int32(0) for _, clusterInfo := range clusterAvailableReplicas { clustersNum++ @@ -118,15 +146,11 @@ func divideReplicasByAggregation(clusterAvailableReplicas []workv1alpha2.TargetC break } } - var unusedClusters []string - for i := clustersNum; i < len(clusterAvailableReplicas); i++ { - unusedClusters = append(unusedClusters, clusterAvailableReplicas[i].Name) - } - return divideReplicasByAvailableReplica(clusterAvailableReplicas[0:clustersNum], replicas, clustersMaxReplicas, unusedClusters...) + return divideReplicasByAvailableReplica(clusterAvailableReplicas[0:clustersNum], replicas, clustersMaxReplicas) } func divideReplicasByAvailableReplica(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32, - clustersMaxReplicas int32, unusedClusters ...string) []workv1alpha2.TargetCluster { + clustersMaxReplicas int32) []workv1alpha2.TargetCluster { desireReplicaInfos := make(map[string]int64) allocatedReplicas := int32(0) for _, clusterInfo := range clusterAvailableReplicas { @@ -140,13 +164,6 @@ func divideReplicasByAvailableReplica(clusterAvailableReplicas []workv1alpha2.Ta } divideRemainingReplicas(int(replicas-allocatedReplicas), desireReplicaInfos, clusterNames) - // For scaling up - for _, cluster := range unusedClusters { - if _, exist := desireReplicaInfos[cluster]; !exist { - desireReplicaInfos[cluster] = 0 - } - } - targetClusters := make([]workv1alpha2.TargetCluster, len(desireReplicaInfos)) i := 0 for key, value := range desireReplicaInfos { @@ -179,45 +196,45 @@ func divideRemainingReplicas(remainingReplicas int, desiredReplicaInfos map[stri } } -func scaleScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, preference policyv1alpha1.ReplicaDivisionPreference, - preSelectedClusters []*clusterv1alpha1.Cluster) ([]workv1alpha2.TargetCluster, error) { - assignedReplicas := util.GetSumOfReplicas(spec.Clusters) - if assignedReplicas > spec.Replicas { - newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference) - if err != nil { - return nil, fmt.Errorf("failed to scaleDown: %v", err) - } - return newTargetClusters, nil - } else if assignedReplicas < spec.Replicas { - newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(spec, preSelectedClusters, preference, assignedReplicas) - if err != nil { - return nil, fmt.Errorf("failed to scaleUp: %v", err) - } - return newTargetClusters, nil - } else { - return spec.Clusters, nil +func scaleDownScheduleByReplicaDivisionPreference( + spec *workv1alpha2.ResourceBindingSpec, + preference policyv1alpha1.ReplicaDivisionPreference, +) ([]workv1alpha2.TargetCluster, error) { + // The previous scheduling result will be the weight reference of scaling down. + // In other words, we scale down the replicas proportionally by their scheduled replicas. + return divideReplicasByPreference(spec.Clusters, spec.Replicas, preference, sets.NewString()) +} + +func scaleUpScheduleByReplicaDivisionPreference( + clusters []*clusterv1alpha1.Cluster, + spec *workv1alpha2.ResourceBindingSpec, + preference policyv1alpha1.ReplicaDivisionPreference, + assignedReplicas int32, +) ([]workv1alpha2.TargetCluster, error) { + // Step 1: Find the clusters that have old replicas, so we can prefer to assign new replicas towards them. + scheduledClusterNames := findOutScheduledCluster(spec.Clusters, clusters) + + // Step 2: Get how many replicas should be scheduled in this cycle and construct a new object if necessary + newSpec := spec + if assignedReplicas > 0 { + newSpec = spec.DeepCopy() + newSpec.Replicas = spec.Replicas - assignedReplicas } -} -func scaleDownScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, - preference policyv1alpha1.ReplicaDivisionPreference) ([]workv1alpha2.TargetCluster, error) { - return divideReplicasByPreference(spec.Clusters, spec.Replicas, preference) -} + // Step 3: Calculate available replicas of all candidates + clusterAvailableReplicas := calAvailableReplicas(clusters, newSpec) + sort.Sort(TargetClustersList(clusterAvailableReplicas)) -func scaleUpScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, preSelectedClusters []*clusterv1alpha1.Cluster, - preference policyv1alpha1.ReplicaDivisionPreference, assignedReplicas int32) ([]workv1alpha2.TargetCluster, error) { - // Find the clusters that have old replicas, so we can prefer to assign new replicas to them. - usedTargetClusters := helper.GetUsedBindingClusterNames(spec.Clusters) - // only the new replicas are considered during this scheduler, the old replicas will not be moved. - // if not the old replicas may be recreated which is not expected during scaling up - // use usedTargetClusters to make sure that we assign new replicas to them preferentially - // so that all the replicas are aggregated - newObject := spec.DeepCopy() - newObject.Replicas = spec.Replicas - assignedReplicas - result, err := divideReplicasByResource(preSelectedClusters, newObject, preference, usedTargetClusters...) + // Step 4: Begin dividing. + // Only the new replicas are considered during this scheduler, the old replicas will not be moved. + // If not, the old replicas may be recreated which is not expected during scaling up. + // The parameter `scheduledClusterNames` is used to make sure that we assign new replicas to them preferentially + // so that all the replicas are aggregated. + result, err := divideReplicasByPreference(clusterAvailableReplicas, newSpec.Replicas, preference, scheduledClusterNames) if err != nil { return result, err } - // merge the result of this scheduler for new replicas and the data of old replicas + + // Step 5: Merge the result of previous and new results. return util.MergeTargetClusters(spec.Clusters, result), nil } diff --git a/pkg/scheduler/core/util.go b/pkg/scheduler/core/util.go index 2a8802edd..d27db3095 100644 --- a/pkg/scheduler/core/util.go +++ b/pkg/scheduler/core/util.go @@ -13,7 +13,6 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" - "github.com/karmada-io/karmada/pkg/scheduler/cache" "github.com/karmada-io/karmada/pkg/util" ) @@ -76,39 +75,47 @@ func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha return availableTargetClusters } -func getPreSelected(targetClusters []workv1alpha2.TargetCluster, schedulerCache cache.Cache) []*clusterv1alpha1.Cluster { - var preSelectedClusters []*clusterv1alpha1.Cluster - clusterInfoSnapshot := schedulerCache.Snapshot() - for _, targetCluster := range targetClusters { - for _, cluster := range clusterInfoSnapshot.GetClusters() { - if targetCluster.Name == cluster.Cluster().Name { - preSelectedClusters = append(preSelectedClusters, cluster.Cluster()) +// findOutScheduledCluster will return a name set of clusters +// which are a part of `feasibleClusters` and have non-zero replicas. +func findOutScheduledCluster(tcs []workv1alpha2.TargetCluster, candidates []*clusterv1alpha1.Cluster) sets.String { + res := sets.NewString() + if len(tcs) == 0 { + return res + } + for _, targetCluster := range tcs { + // must have non-zero replicas + if targetCluster.Replicas <= 0 { + continue + } + // must in `candidates` + for _, cluster := range candidates { + if targetCluster.Name == cluster.Name { + res.Insert(targetCluster.Name) break } } } - return preSelectedClusters + return res } -// presortClusterList is used to make sure preUsedClusterNames are in front of the other clusters in the list of +// resortClusterList is used to make sure scheduledClusterNames are in front of the other clusters in the list of // clusterAvailableReplicas so that we can assign new replicas to them preferentially when scale up. -// Note that preUsedClusterNames have none items during first scheduler -func presortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, preUsedClusterNames ...string) []workv1alpha2.TargetCluster { - if len(preUsedClusterNames) == 0 { +// Note that scheduledClusterNames have none items during first scheduler +func resortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, scheduledClusterNames sets.String) []workv1alpha2.TargetCluster { + if scheduledClusterNames.Len() == 0 { return clusterAvailableReplicas } - preUsedClusterSet := sets.NewString(preUsedClusterNames...) var preUsedCluster []workv1alpha2.TargetCluster var unUsedCluster []workv1alpha2.TargetCluster for i := range clusterAvailableReplicas { - if preUsedClusterSet.Has(clusterAvailableReplicas[i].Name) { + if scheduledClusterNames.Has(clusterAvailableReplicas[i].Name) { preUsedCluster = append(preUsedCluster, clusterAvailableReplicas[i]) } else { unUsedCluster = append(unUsedCluster, clusterAvailableReplicas[i]) } } clusterAvailableReplicas = append(preUsedCluster, unUsedCluster...) - klog.V(4).Infof("resorted target cluster: %v", clusterAvailableReplicas) + klog.V(4).Infof("Resorted target cluster: %v", clusterAvailableReplicas) return clusterAvailableReplicas }