Merge pull request #546 from qianjun1993/scale-scheduler

Replicas changes with aggregated ReplicaDivisionPreference
This commit is contained in:
karmada-bot 2021-07-23 15:41:46 +08:00 committed by GitHub
commit b9912a6f22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 7 deletions

View File

@ -7,6 +7,7 @@ import (
"sort"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
@ -293,17 +294,21 @@ func (a TargetClustersList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TargetClustersList) Less(i, j int) bool { return a[i].Replicas > a[j].Replicas }
// divideReplicasAggregatedWithResource assigns a total number of replicas to the selected clusters aggregated according to the resource
func (g *genericScheduler) divideReplicasAggregatedWithResource(clusters []*clusterv1alpha1.Cluster, object *workv1alpha1.ObjectReference) ([]workv1alpha1.TargetCluster, error) {
func (g *genericScheduler) divideReplicasAggregatedWithResource(clusters []*clusterv1alpha1.Cluster, object *workv1alpha1.ObjectReference,
preUsedClustersName ...string) ([]workv1alpha1.TargetCluster, error) {
// preUsedClustersName is used to prioritize the clusters
for _, value := range object.ReplicaResourceRequirements {
if value.Value() > 0 {
return g.divideReplicasAggregatedWithResourceRequirements(clusters, object)
return g.divideReplicasAggregatedWithResourceRequirements(clusters, object, preUsedClustersName...)
}
}
return g.divideReplicasAggregatedWithoutResourceRequirements(clusters, object)
return g.divideReplicasAggregatedWithoutResourceRequirements(clusters, object, preUsedClustersName...)
}
func (g *genericScheduler) divideReplicasAggregatedWithoutResourceRequirements(clusters []*clusterv1alpha1.Cluster,
object *workv1alpha1.ObjectReference) ([]workv1alpha1.TargetCluster, error) {
object *workv1alpha1.ObjectReference, preUsedClustersName ...string) ([]workv1alpha1.TargetCluster, error) {
preUsedClusters, unUsedClusters := g.getPreUsed(clusters, preUsedClustersName...)
clusters = append(preUsedClusters, unUsedClusters...)
targetClusters := make([]workv1alpha1.TargetCluster, len(clusters))
for i, clusterInfo := range clusters {
targetClusters[i] = workv1alpha1.TargetCluster{Name: clusterInfo.Name, Replicas: 0}
@ -313,8 +318,14 @@ func (g *genericScheduler) divideReplicasAggregatedWithoutResourceRequirements(c
}
func (g *genericScheduler) divideReplicasAggregatedWithResourceRequirements(clusters []*clusterv1alpha1.Cluster,
object *workv1alpha1.ObjectReference) ([]workv1alpha1.TargetCluster, error) {
clusterAvailableReplicas := g.calAvailableReplicas(clusters, object.ReplicaResourceRequirements)
object *workv1alpha1.ObjectReference, preUsedClustersName ...string) ([]workv1alpha1.TargetCluster, error) {
// make sure preUsedClusters are in front of the unUsedClusters in the list of clusterAvailableReplicas
// so that we can assign new replicas to them preferentially when scale up.
// preUsedClusters have none items during first scheduler
preUsedClusters, unUsedClusters := g.getPreUsed(clusters, preUsedClustersName...)
preUsedClustersAvailableReplicas := g.calAvailableReplicas(preUsedClusters, object.ReplicaResourceRequirements)
unUsedClustersAvailableReplicas := g.calAvailableReplicas(unUsedClusters, object.ReplicaResourceRequirements)
clusterAvailableReplicas := append(preUsedClustersAvailableReplicas, unUsedClustersAvailableReplicas...)
return g.divideReplicasAggregatedWithClusterReplicas(clusterAvailableReplicas, object.Replicas)
}
@ -430,11 +441,11 @@ func (g *genericScheduler) ScaleSchedule(ctx context.Context, placement *policyv
return result, nil
}
if placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided {
preSelectedClusters := g.getPreSelected(spec.Clusters)
if placement.ReplicaScheduling.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted {
if placement.ReplicaScheduling.WeightPreference == nil {
return result, fmt.Errorf("no WeightPreference find to divide replicas")
}
preSelectedClusters := g.getPreSelected(spec.Clusters)
clustersWithReplicase, err := g.divideReplicasByStaticWeight(preSelectedClusters, placement.ReplicaScheduling.WeightPreference.StaticWeightList, spec.Resource.Replicas)
if err != nil {
return result, fmt.Errorf("failed to assignReplicas with Weight: %v", err)
@ -442,6 +453,10 @@ func (g *genericScheduler) ScaleSchedule(ctx context.Context, placement *policyv
result.SuggestedClusters = clustersWithReplicase
return result, nil
}
if placement.ReplicaScheduling.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceAggregated {
return g.scaleScheduleWithReplicaDivisionPreferenceAggregated(spec)
}
return g.scaleScheduleWithReplicaDivisionPreferenceAggregated(spec)
}
}
@ -452,6 +467,67 @@ func (g *genericScheduler) ScaleSchedule(ctx context.Context, placement *policyv
return result, nil
}
func (g *genericScheduler) scaleScheduleWithReplicaDivisionPreferenceAggregated(spec *workv1alpha1.ResourceBindingSpec) (result ScheduleResult, err error) {
assignedReplicas := util.GetSumOfReplicas(spec.Clusters)
if assignedReplicas > spec.Resource.Replicas {
newTargetClusters, err := g.scaleDownScheduleWithReplicaDivisionPreferenceAggregated(spec)
if err != nil {
return result, fmt.Errorf("failed to scaleDown: %v", err)
}
result.SuggestedClusters = newTargetClusters
} else if assignedReplicas < spec.Resource.Replicas {
newTargetClusters, err := g.scaleUpScheduleWithReplicaDivisionPreferenceAggregated(spec)
if err != nil {
return result, fmt.Errorf("failed to scaleUp: %v", err)
}
result.SuggestedClusters = newTargetClusters
} else {
result.SuggestedClusters = spec.Clusters
}
return result, nil
}
func (g *genericScheduler) scaleDownScheduleWithReplicaDivisionPreferenceAggregated(spec *workv1alpha1.ResourceBindingSpec) ([]workv1alpha1.TargetCluster, error) {
return g.divideReplicasAggregatedWithClusterReplicas(spec.Clusters, spec.Resource.Replicas)
}
func (g *genericScheduler) scaleUpScheduleWithReplicaDivisionPreferenceAggregated(spec *workv1alpha1.ResourceBindingSpec) ([]workv1alpha1.TargetCluster, error) {
// find the clusters that have old replicas so we can assign new replicas to them preferentially
// targetMap map of the result for the old replicas so that it can be merged with the new result easily
targetMap := make(map[string]int32)
usedTargetClusters := make([]string, 0)
assignedReplicas := int32(0)
for _, cluster := range spec.Clusters {
targetMap[cluster.Name] = cluster.Replicas
assignedReplicas += cluster.Replicas
if cluster.Replicas > 0 {
usedTargetClusters = append(usedTargetClusters, cluster.Name)
}
}
preSelected := g.getPreSelected(spec.Clusters)
// only the new replicas are considered during this scheduler, the old replicas will not be moved.
// if not the old replicas may be recreated which is not expected during scaling up
// use usedTargetClusters to make sure that we assign new replicas to them preferentially so that all the replicas are aggregated
newObject := spec.Resource.DeepCopy()
newObject.Replicas = spec.Resource.Replicas - assignedReplicas
result, err := g.divideReplicasAggregatedWithResource(preSelected, newObject, usedTargetClusters...)
if err != nil {
return result, err
}
// merge the result of this scheduler for new replicas and the data of old replicas
for i, cluster := range result {
value, ok := targetMap[cluster.Name]
if ok {
result[i].Replicas = cluster.Replicas + value
delete(targetMap, cluster.Name)
}
}
for key, value := range targetMap {
result = append(result, workv1alpha1.TargetCluster{Name: key, Replicas: value})
}
return result, nil
}
func (g *genericScheduler) getPreSelected(targetClusters []workv1alpha1.TargetCluster) []*clusterv1alpha1.Cluster {
var preSelectedClusters []*clusterv1alpha1.Cluster
clusterInfoSnapshot := g.schedulerCache.Snapshot()
@ -465,3 +541,20 @@ func (g *genericScheduler) getPreSelected(targetClusters []workv1alpha1.TargetCl
}
return preSelectedClusters
}
func (g *genericScheduler) getPreUsed(clusters []*clusterv1alpha1.Cluster, preUsedClustersName ...string) ([]*clusterv1alpha1.Cluster, []*clusterv1alpha1.Cluster) {
if len(preUsedClustersName) == 0 {
return clusters, nil
}
preUsedClusterSet := sets.NewString(preUsedClustersName...)
var preUsedCluster []*clusterv1alpha1.Cluster
var unUsedCluster []*clusterv1alpha1.Cluster
for i := range clusters {
if preUsedClusterSet.Has(clusters[i].Name) {
preUsedCluster = append(preUsedCluster, clusters[i])
} else {
unUsedCluster = append(unUsedCluster, clusters[i])
}
}
return preUsedCluster, unUsedCluster
}

View File

@ -36,3 +36,12 @@ func IsBindingReplicasChanged(bindingSpec *workv1alpha1.ResourceBindingSpec, str
}
return false
}
// GetSumOfReplicas will get the sum of replicas in target clusters
func GetSumOfReplicas(clusters []workv1alpha1.TargetCluster) int32 {
replicasSum := int32(0)
for i := range clusters {
replicasSum += clusters[i].Replicas
}
return replicasSum
}