merge scale scheduling with normal scheduling
Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
parent
09c04490bc
commit
eb0dac3a68
|
@ -4,6 +4,8 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
|
@ -28,11 +30,33 @@ func divideReplicasByDynamicWeight(clusters []*clusterv1alpha1.Cluster, dynamicW
|
|||
}
|
||||
}
|
||||
|
||||
func divideReplicasByResource(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) {
|
||||
clusterAvailableReplicas := calAvailableReplicas(clusters, spec)
|
||||
sort.Sort(TargetClustersList(clusterAvailableReplicas))
|
||||
return divideReplicasByPreference(clusterAvailableReplicas, spec.Replicas, preference, preUsedClustersName...)
|
||||
func divideReplicasByResource(
|
||||
clusters []*clusterv1alpha1.Cluster,
|
||||
spec *workv1alpha2.ResourceBindingSpec,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference,
|
||||
) ([]workv1alpha2.TargetCluster, error) {
|
||||
// Step 1: Get previous total sum of replicas.
|
||||
assignedReplicas := util.GetSumOfReplicas(spec.Clusters)
|
||||
|
||||
// Step 2: Check the scale type (up or down).
|
||||
if assignedReplicas > spec.Replicas {
|
||||
// We need to reduce the replicas in terms of the previous result.
|
||||
newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scale down: %v", err)
|
||||
}
|
||||
return newTargetClusters, nil
|
||||
} else if assignedReplicas < spec.Replicas {
|
||||
// We need to enlarge the replicas in terms of the previous result (if exists).
|
||||
// First scheduling is considered as a special kind of scaling up.
|
||||
newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference, assignedReplicas)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scaleUp: %v", err)
|
||||
}
|
||||
return newTargetClusters, nil
|
||||
} else {
|
||||
return spec.Clusters, nil
|
||||
}
|
||||
}
|
||||
|
||||
// divideReplicasByStaticWeight assigns a total number of replicas to the selected clusters by the weight list.
|
||||
|
@ -90,8 +114,12 @@ func divideReplicasByStaticWeight(clusters []*clusterv1alpha1.Cluster, weightLis
|
|||
}
|
||||
|
||||
// divideReplicasByPreference assigns a total number of replicas to the selected clusters by preference according to the resource.
|
||||
func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) {
|
||||
func divideReplicasByPreference(
|
||||
clusterAvailableReplicas []workv1alpha2.TargetCluster,
|
||||
replicas int32,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference,
|
||||
scheduledClusterNames sets.String,
|
||||
) ([]workv1alpha2.TargetCluster, error) {
|
||||
clustersMaxReplicas := util.GetSumOfReplicas(clusterAvailableReplicas)
|
||||
if clustersMaxReplicas < replicas {
|
||||
return nil, fmt.Errorf("clusters resources are not enough to schedule, max %d replicas are support", clustersMaxReplicas)
|
||||
|
@ -99,7 +127,7 @@ func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCl
|
|||
|
||||
switch preference {
|
||||
case policyv1alpha1.ReplicaDivisionPreferenceAggregated:
|
||||
return divideReplicasByAggregation(clusterAvailableReplicas, replicas, preUsedClustersName...), nil
|
||||
return divideReplicasByAggregation(clusterAvailableReplicas, replicas, scheduledClusterNames), nil
|
||||
case policyv1alpha1.ReplicaDivisionPreferenceWeighted:
|
||||
return divideReplicasByAvailableReplica(clusterAvailableReplicas, replicas, clustersMaxReplicas), nil
|
||||
default:
|
||||
|
@ -108,8 +136,8 @@ func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCl
|
|||
}
|
||||
|
||||
func divideReplicasByAggregation(clusterAvailableReplicas []workv1alpha2.TargetCluster,
|
||||
replicas int32, preUsedClustersName ...string) []workv1alpha2.TargetCluster {
|
||||
clusterAvailableReplicas = presortClusterList(clusterAvailableReplicas, preUsedClustersName...)
|
||||
replicas int32, scheduledClusterNames sets.String) []workv1alpha2.TargetCluster {
|
||||
clusterAvailableReplicas = resortClusterList(clusterAvailableReplicas, scheduledClusterNames)
|
||||
clustersNum, clustersMaxReplicas := 0, int32(0)
|
||||
for _, clusterInfo := range clusterAvailableReplicas {
|
||||
clustersNum++
|
||||
|
@ -118,15 +146,11 @@ func divideReplicasByAggregation(clusterAvailableReplicas []workv1alpha2.TargetC
|
|||
break
|
||||
}
|
||||
}
|
||||
var unusedClusters []string
|
||||
for i := clustersNum; i < len(clusterAvailableReplicas); i++ {
|
||||
unusedClusters = append(unusedClusters, clusterAvailableReplicas[i].Name)
|
||||
}
|
||||
return divideReplicasByAvailableReplica(clusterAvailableReplicas[0:clustersNum], replicas, clustersMaxReplicas, unusedClusters...)
|
||||
return divideReplicasByAvailableReplica(clusterAvailableReplicas[0:clustersNum], replicas, clustersMaxReplicas)
|
||||
}
|
||||
|
||||
func divideReplicasByAvailableReplica(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32,
|
||||
clustersMaxReplicas int32, unusedClusters ...string) []workv1alpha2.TargetCluster {
|
||||
clustersMaxReplicas int32) []workv1alpha2.TargetCluster {
|
||||
desireReplicaInfos := make(map[string]int64)
|
||||
allocatedReplicas := int32(0)
|
||||
for _, clusterInfo := range clusterAvailableReplicas {
|
||||
|
@ -140,13 +164,6 @@ func divideReplicasByAvailableReplica(clusterAvailableReplicas []workv1alpha2.Ta
|
|||
}
|
||||
divideRemainingReplicas(int(replicas-allocatedReplicas), desireReplicaInfos, clusterNames)
|
||||
|
||||
// For scaling up
|
||||
for _, cluster := range unusedClusters {
|
||||
if _, exist := desireReplicaInfos[cluster]; !exist {
|
||||
desireReplicaInfos[cluster] = 0
|
||||
}
|
||||
}
|
||||
|
||||
targetClusters := make([]workv1alpha2.TargetCluster, len(desireReplicaInfos))
|
||||
i := 0
|
||||
for key, value := range desireReplicaInfos {
|
||||
|
@ -179,45 +196,45 @@ func divideRemainingReplicas(remainingReplicas int, desiredReplicaInfos map[stri
|
|||
}
|
||||
}
|
||||
|
||||
func scaleScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, preference policyv1alpha1.ReplicaDivisionPreference,
|
||||
preSelectedClusters []*clusterv1alpha1.Cluster) ([]workv1alpha2.TargetCluster, error) {
|
||||
assignedReplicas := util.GetSumOfReplicas(spec.Clusters)
|
||||
if assignedReplicas > spec.Replicas {
|
||||
newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scaleDown: %v", err)
|
||||
}
|
||||
return newTargetClusters, nil
|
||||
} else if assignedReplicas < spec.Replicas {
|
||||
newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(spec, preSelectedClusters, preference, assignedReplicas)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scaleUp: %v", err)
|
||||
}
|
||||
return newTargetClusters, nil
|
||||
} else {
|
||||
return spec.Clusters, nil
|
||||
func scaleDownScheduleByReplicaDivisionPreference(
|
||||
spec *workv1alpha2.ResourceBindingSpec,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference,
|
||||
) ([]workv1alpha2.TargetCluster, error) {
|
||||
// The previous scheduling result will be the weight reference of scaling down.
|
||||
// In other words, we scale down the replicas proportionally by their scheduled replicas.
|
||||
return divideReplicasByPreference(spec.Clusters, spec.Replicas, preference, sets.NewString())
|
||||
}
|
||||
|
||||
func scaleUpScheduleByReplicaDivisionPreference(
|
||||
clusters []*clusterv1alpha1.Cluster,
|
||||
spec *workv1alpha2.ResourceBindingSpec,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference,
|
||||
assignedReplicas int32,
|
||||
) ([]workv1alpha2.TargetCluster, error) {
|
||||
// Step 1: Find the clusters that have old replicas, so we can prefer to assign new replicas towards them.
|
||||
scheduledClusterNames := findOutScheduledCluster(spec.Clusters, clusters)
|
||||
|
||||
// Step 2: Get how many replicas should be scheduled in this cycle and construct a new object if necessary
|
||||
newSpec := spec
|
||||
if assignedReplicas > 0 {
|
||||
newSpec = spec.DeepCopy()
|
||||
newSpec.Replicas = spec.Replicas - assignedReplicas
|
||||
}
|
||||
}
|
||||
|
||||
func scaleDownScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference) ([]workv1alpha2.TargetCluster, error) {
|
||||
return divideReplicasByPreference(spec.Clusters, spec.Replicas, preference)
|
||||
}
|
||||
// Step 3: Calculate available replicas of all candidates
|
||||
clusterAvailableReplicas := calAvailableReplicas(clusters, newSpec)
|
||||
sort.Sort(TargetClustersList(clusterAvailableReplicas))
|
||||
|
||||
func scaleUpScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, preSelectedClusters []*clusterv1alpha1.Cluster,
|
||||
preference policyv1alpha1.ReplicaDivisionPreference, assignedReplicas int32) ([]workv1alpha2.TargetCluster, error) {
|
||||
// Find the clusters that have old replicas, so we can prefer to assign new replicas to them.
|
||||
usedTargetClusters := helper.GetUsedBindingClusterNames(spec.Clusters)
|
||||
// only the new replicas are considered during this scheduler, the old replicas will not be moved.
|
||||
// if not the old replicas may be recreated which is not expected during scaling up
|
||||
// use usedTargetClusters to make sure that we assign new replicas to them preferentially
|
||||
// so that all the replicas are aggregated
|
||||
newObject := spec.DeepCopy()
|
||||
newObject.Replicas = spec.Replicas - assignedReplicas
|
||||
result, err := divideReplicasByResource(preSelectedClusters, newObject, preference, usedTargetClusters...)
|
||||
// Step 4: Begin dividing.
|
||||
// Only the new replicas are considered during this scheduler, the old replicas will not be moved.
|
||||
// If not, the old replicas may be recreated which is not expected during scaling up.
|
||||
// The parameter `scheduledClusterNames` is used to make sure that we assign new replicas to them preferentially
|
||||
// so that all the replicas are aggregated.
|
||||
result, err := divideReplicasByPreference(clusterAvailableReplicas, newSpec.Replicas, preference, scheduledClusterNames)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
// merge the result of this scheduler for new replicas and the data of old replicas
|
||||
|
||||
// Step 5: Merge the result of previous and new results.
|
||||
return util.MergeTargetClusters(spec.Clusters, result), nil
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
|
||||
"github.com/karmada-io/karmada/pkg/scheduler/cache"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
)
|
||||
|
||||
|
@ -76,39 +75,47 @@ func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha
|
|||
return availableTargetClusters
|
||||
}
|
||||
|
||||
func getPreSelected(targetClusters []workv1alpha2.TargetCluster, schedulerCache cache.Cache) []*clusterv1alpha1.Cluster {
|
||||
var preSelectedClusters []*clusterv1alpha1.Cluster
|
||||
clusterInfoSnapshot := schedulerCache.Snapshot()
|
||||
for _, targetCluster := range targetClusters {
|
||||
for _, cluster := range clusterInfoSnapshot.GetClusters() {
|
||||
if targetCluster.Name == cluster.Cluster().Name {
|
||||
preSelectedClusters = append(preSelectedClusters, cluster.Cluster())
|
||||
// findOutScheduledCluster will return a name set of clusters
|
||||
// which are a part of `feasibleClusters` and have non-zero replicas.
|
||||
func findOutScheduledCluster(tcs []workv1alpha2.TargetCluster, candidates []*clusterv1alpha1.Cluster) sets.String {
|
||||
res := sets.NewString()
|
||||
if len(tcs) == 0 {
|
||||
return res
|
||||
}
|
||||
for _, targetCluster := range tcs {
|
||||
// must have non-zero replicas
|
||||
if targetCluster.Replicas <= 0 {
|
||||
continue
|
||||
}
|
||||
// must in `candidates`
|
||||
for _, cluster := range candidates {
|
||||
if targetCluster.Name == cluster.Name {
|
||||
res.Insert(targetCluster.Name)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return preSelectedClusters
|
||||
return res
|
||||
}
|
||||
|
||||
// presortClusterList is used to make sure preUsedClusterNames are in front of the other clusters in the list of
|
||||
// resortClusterList is used to make sure scheduledClusterNames are in front of the other clusters in the list of
|
||||
// clusterAvailableReplicas so that we can assign new replicas to them preferentially when scale up.
|
||||
// Note that preUsedClusterNames have none items during first scheduler
|
||||
func presortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, preUsedClusterNames ...string) []workv1alpha2.TargetCluster {
|
||||
if len(preUsedClusterNames) == 0 {
|
||||
// Note that scheduledClusterNames have none items during first scheduler
|
||||
func resortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, scheduledClusterNames sets.String) []workv1alpha2.TargetCluster {
|
||||
if scheduledClusterNames.Len() == 0 {
|
||||
return clusterAvailableReplicas
|
||||
}
|
||||
preUsedClusterSet := sets.NewString(preUsedClusterNames...)
|
||||
var preUsedCluster []workv1alpha2.TargetCluster
|
||||
var unUsedCluster []workv1alpha2.TargetCluster
|
||||
for i := range clusterAvailableReplicas {
|
||||
if preUsedClusterSet.Has(clusterAvailableReplicas[i].Name) {
|
||||
if scheduledClusterNames.Has(clusterAvailableReplicas[i].Name) {
|
||||
preUsedCluster = append(preUsedCluster, clusterAvailableReplicas[i])
|
||||
} else {
|
||||
unUsedCluster = append(unUsedCluster, clusterAvailableReplicas[i])
|
||||
}
|
||||
}
|
||||
clusterAvailableReplicas = append(preUsedCluster, unUsedCluster...)
|
||||
klog.V(4).Infof("resorted target cluster: %v", clusterAvailableReplicas)
|
||||
klog.V(4).Infof("Resorted target cluster: %v", clusterAvailableReplicas)
|
||||
return clusterAvailableReplicas
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue