Merge pull request #512 from qianjun1993/scheduler
implementation of aggregated ReplicaDivisionPreference strategy
This commit is contained in:
commit
e29e531802
|
@ -3,7 +3,10 @@ package core
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
|
@ -213,6 +216,10 @@ func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, r
|
|||
}
|
||||
return g.divideReplicasByStaticWeight(clusters, replicaSchedulingStrategy.WeightPreference.StaticWeightList, object.Replicas)
|
||||
}
|
||||
if replicaSchedulingStrategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceAggregated {
|
||||
return g.divideReplicasAggregatedWithResource(clusters, object)
|
||||
}
|
||||
return g.divideReplicasAggregatedWithResource(clusters, object) //default policy for ReplicaSchedulingTypeDivided
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -276,3 +283,135 @@ func (g *genericScheduler) divideReplicasByStaticWeight(clusters []*clusterv1alp
|
|||
}
|
||||
return targetClusters, nil
|
||||
}
|
||||
|
||||
// TargetClustersList is a slice of TargetCluster that implements sort.Interface to sort by Value.
|
||||
type TargetClustersList []workv1alpha1.TargetCluster
|
||||
|
||||
func (a TargetClustersList) Len() int { return len(a) }
|
||||
func (a TargetClustersList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a TargetClustersList) Less(i, j int) bool { return a[i].Replicas > a[j].Replicas }
|
||||
|
||||
// divideReplicasAggregatedWithResource assigns a total number of replicas to the selected clusters aggregated according to the resource
|
||||
func (g *genericScheduler) divideReplicasAggregatedWithResource(clusters []*clusterv1alpha1.Cluster, object *workv1alpha1.ObjectReference) ([]workv1alpha1.TargetCluster, error) {
|
||||
for _, value := range object.ReplicaResourceRequirements {
|
||||
if value.Value() > 0 {
|
||||
return g.divideReplicasAggregatedWithResourceRequirements(clusters, object)
|
||||
}
|
||||
}
|
||||
return g.divideReplicasAggregatedWithoutResourceRequirements(clusters, object)
|
||||
}
|
||||
|
||||
func (g *genericScheduler) divideReplicasAggregatedWithoutResourceRequirements(clusters []*clusterv1alpha1.Cluster,
|
||||
object *workv1alpha1.ObjectReference) ([]workv1alpha1.TargetCluster, error) {
|
||||
targetClusters := make([]workv1alpha1.TargetCluster, len(clusters))
|
||||
for i, clusterInfo := range clusters {
|
||||
targetClusters[i] = workv1alpha1.TargetCluster{Name: clusterInfo.Name, Replicas: 0}
|
||||
}
|
||||
targetClusters[0] = workv1alpha1.TargetCluster{Name: clusters[0].Name, Replicas: object.Replicas}
|
||||
return targetClusters, nil
|
||||
}
|
||||
|
||||
func (g *genericScheduler) divideReplicasAggregatedWithResourceRequirements(clusters []*clusterv1alpha1.Cluster,
|
||||
object *workv1alpha1.ObjectReference) ([]workv1alpha1.TargetCluster, error) {
|
||||
clusterAvailableReplicas := g.calAvailableReplicas(clusters, object.ReplicaResourceRequirements)
|
||||
return g.divideReplicasAggregatedWithClusterReplicas(clusterAvailableReplicas, object.Replicas)
|
||||
}
|
||||
|
||||
func (g *genericScheduler) calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, replicaResourceRequirements corev1.ResourceList) []workv1alpha1.TargetCluster {
|
||||
availableTargetClusters := make([]workv1alpha1.TargetCluster, len(clusters))
|
||||
for i, cluster := range clusters {
|
||||
maxReplicas := g.calClusterAvailableReplicas(cluster, replicaResourceRequirements)
|
||||
availableTargetClusters[i] = workv1alpha1.TargetCluster{Name: cluster.Name, Replicas: maxReplicas}
|
||||
}
|
||||
sort.Sort(TargetClustersList(availableTargetClusters))
|
||||
return availableTargetClusters
|
||||
}
|
||||
|
||||
// calClusterAvailableReplicas calculates how many replicas can be applied to the target cluster.
|
||||
func (g *genericScheduler) calClusterAvailableReplicas(cluster *clusterv1alpha1.Cluster, resourcePerReplicas corev1.ResourceList) int32 {
|
||||
var maximumReplicas int64 = math.MaxInt32
|
||||
resourceSummary := cluster.Status.ResourceSummary
|
||||
|
||||
for key, value := range resourcePerReplicas {
|
||||
requestedQuantity := value.Value()
|
||||
if requestedQuantity <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// calculates available resource quantity
|
||||
// available = allocatable - allocated - allocating
|
||||
allocatable, ok := resourceSummary.Allocatable[key]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
allocated, ok := resourceSummary.Allocated[key]
|
||||
if ok {
|
||||
allocatable.Sub(allocated)
|
||||
}
|
||||
allocating, ok := resourceSummary.Allocating[key]
|
||||
if ok {
|
||||
allocatable.Sub(allocating)
|
||||
}
|
||||
availableQuantity := allocatable.Value()
|
||||
// short path: no more resource left.
|
||||
if availableQuantity <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
if key == corev1.ResourceCPU {
|
||||
requestedQuantity = value.MilliValue()
|
||||
availableQuantity = allocatable.MilliValue()
|
||||
}
|
||||
|
||||
maximumReplicasForResource := availableQuantity / requestedQuantity
|
||||
if maximumReplicasForResource < maximumReplicas {
|
||||
maximumReplicas = maximumReplicasForResource
|
||||
}
|
||||
}
|
||||
|
||||
return int32(maximumReplicas)
|
||||
}
|
||||
|
||||
func (g *genericScheduler) divideReplicasAggregatedWithClusterReplicas(clusterAvailableReplicas []workv1alpha1.TargetCluster, replicas int32) ([]workv1alpha1.TargetCluster, error) {
|
||||
clustersNum := 0
|
||||
clustersMaxReplicas := int32(0)
|
||||
for _, clusterInfo := range clusterAvailableReplicas {
|
||||
clustersNum++
|
||||
clustersMaxReplicas += clusterInfo.Replicas
|
||||
if clustersMaxReplicas >= replicas {
|
||||
break
|
||||
}
|
||||
}
|
||||
if clustersMaxReplicas < replicas {
|
||||
return nil, fmt.Errorf("clusters resources are not enough to schedule, max %v replicas are support", clustersMaxReplicas)
|
||||
}
|
||||
|
||||
desireReplicaInfos := make(map[string]int32)
|
||||
allocatedReplicas := int32(0)
|
||||
for i, clusterInfo := range clusterAvailableReplicas {
|
||||
if i >= clustersNum {
|
||||
desireReplicaInfos[clusterInfo.Name] = 0
|
||||
continue
|
||||
}
|
||||
desireReplicaInfos[clusterInfo.Name] = clusterInfo.Replicas * replicas / clustersMaxReplicas
|
||||
allocatedReplicas += desireReplicaInfos[clusterInfo.Name]
|
||||
}
|
||||
|
||||
if remainReplicas := replicas - allocatedReplicas; remainReplicas > 0 {
|
||||
for i := 0; remainReplicas > 0; i++ {
|
||||
desireReplicaInfos[clusterAvailableReplicas[i].Name]++
|
||||
remainReplicas--
|
||||
if i == clustersNum {
|
||||
i = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
targetClusters := make([]workv1alpha1.TargetCluster, len(clusterAvailableReplicas))
|
||||
i := 0
|
||||
for key, value := range desireReplicaInfos {
|
||||
targetClusters[i] = workv1alpha1.TargetCluster{Name: key, Replicas: value}
|
||||
i++
|
||||
}
|
||||
return targetClusters, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue