diff --git a/pkg/controllers/binding/common.go b/pkg/controllers/binding/common.go index 982ce5b43..ea1e2f04b 100644 --- a/pkg/controllers/binding/common.go +++ b/pkg/controllers/binding/common.go @@ -247,7 +247,7 @@ func divideReplicasByJobCompletions(workload *unstructured.Unstructured, cluster } if found { - targetClusters = util.DivideReplicasByTargetCluster(clusters, int32(completions)) + targetClusters = helper.SpreadReplicasByTargetClusters(int32(completions), clusters, nil) } return targetClusters, nil diff --git a/pkg/scheduler/core/assignment.go b/pkg/scheduler/core/assignment.go index 0b7d75f29..8a082ed43 100644 --- a/pkg/scheduler/core/assignment.go +++ b/pkg/scheduler/core/assignment.go @@ -9,6 +9,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" ) var ( @@ -140,10 +141,10 @@ func assignByStaticWeightStrategy(state *assignState) ([]workv1alpha2.TargetClus } weightList := getStaticWeightInfoList(state.candidates, state.strategy.WeightPreference.StaticWeightList) - disp := newDispenser(state.spec.Replicas, nil) - disp.takeByWeight(weightList) + disp := helper.NewDispenser(state.spec.Replicas, nil) + disp.TakeByWeight(weightList) - return disp.result, nil + return disp.Result, nil } func assignByDynamicStrategy(state *assignState) ([]workv1alpha2.TargetCluster, error) { diff --git a/pkg/scheduler/core/division_algorithm.go b/pkg/scheduler/core/division_algorithm.go index a5f252d83..4fc586412 100644 --- a/pkg/scheduler/core/division_algorithm.go +++ b/pkg/scheduler/core/division_algorithm.go @@ -18,55 +18,6 @@ func (a TargetClustersList) Len() int { return len(a) } func (a TargetClustersList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a TargetClustersList) Less(i, j int) bool { return a[i].Replicas > a[j].Replicas } -type dispenser struct { - numReplicas int32 - result []workv1alpha2.TargetCluster -} - -func newDispenser(numReplicas int32, init []workv1alpha2.TargetCluster) *dispenser { - cp := make([]workv1alpha2.TargetCluster, len(init)) - copy(cp, init) - return &dispenser{numReplicas: numReplicas, result: cp} -} - -func (a *dispenser) done() bool { - return a.numReplicas == 0 && len(a.result) != 0 -} - -func (a *dispenser) takeByWeight(w helper.ClusterWeightInfoList) { - if a.done() { - return - } - sum := w.GetWeightSum() - if sum == 0 { - return - } - - sort.Sort(w) - - result := make([]workv1alpha2.TargetCluster, 0, w.Len()) - remain := a.numReplicas - for _, info := range w { - replicas := int32(info.Weight * int64(a.numReplicas) / sum) - result = append(result, workv1alpha2.TargetCluster{ - Name: info.ClusterName, - Replicas: replicas, - }) - remain -= replicas - } - // TODO(Garrybest): take rest replicas by fraction part - for i := range result { - if remain == 0 { - break - } - result[i].Replicas++ - remain-- - } - - a.numReplicas = remain - a.result = util.MergeTargetClusters(a.result, result) -} - func getStaticWeightInfoList(clusters []*clusterv1alpha1.Cluster, weightList []policyv1alpha1.StaticClusterWeight) helper.ClusterWeightInfoList { list := make(helper.ClusterWeightInfoList, 0) for _, cluster := range clusters { @@ -94,17 +45,6 @@ func getStaticWeightInfoList(clusters []*clusterv1alpha1.Cluster, weightList []p return list } -func getStaticWeightInfoListByTargetClusters(tcs []workv1alpha2.TargetCluster) helper.ClusterWeightInfoList { - weightList := make(helper.ClusterWeightInfoList, 0, len(tcs)) - for _, result := range tcs { - weightList = append(weightList, helper.ClusterWeightInfo{ - ClusterName: result.Name, - Weight: int64(result.Replicas), - }) - } - return weightList -} - // dynamicDivideReplicas assigns a total number of replicas to the selected clusters by preference according to the resource. func dynamicDivideReplicas(state *assignState) ([]workv1alpha2.TargetCluster, error) { if state.availableReplicas < state.targetReplicas { @@ -125,10 +65,7 @@ func dynamicDivideReplicas(state *assignState) ([]workv1alpha2.TargetCluster, er case DynamicWeightStrategy: // Set the availableClusters as the weight, scheduledClusters as init result, target as the dispenser object. // After dispensing, the target cluster will be the combination of init result and weighted result for target replicas. - weightList := getStaticWeightInfoListByTargetClusters(state.availableClusters) - disp := newDispenser(state.targetReplicas, state.scheduledClusters) - disp.takeByWeight(weightList) - return disp.result, nil + return helper.SpreadReplicasByTargetClusters(state.targetReplicas, state.availableClusters, state.scheduledClusters), nil default: // should never happen return nil, fmt.Errorf("undefined strategy type: %s", state.strategyType) diff --git a/pkg/scheduler/core/division_algorithm_test.go b/pkg/scheduler/core/division_algorithm_test.go index aa7c0146d..8f4f7db36 100644 --- a/pkg/scheduler/core/division_algorithm_test.go +++ b/pkg/scheduler/core/division_algorithm_test.go @@ -87,13 +87,13 @@ func Test_dispenser_takeByWeight(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - a := newDispenser(tt.numReplicas, tt.result) - a.takeByWeight(tt.weightList) - if a.done() != tt.done { - t.Errorf("expected after takeByWeight: %v, but got: %v", tt.done, a.done()) + a := utilhelper.NewDispenser(tt.numReplicas, tt.result) + a.TakeByWeight(tt.weightList) + if a.Done() != tt.done { + t.Errorf("expected after takeByWeight: %v, but got: %v", tt.done, a.Done()) } - if !helper.IsScheduleResultEqual(a.result, tt.desired) { - t.Errorf("expected result after takeByWeight: %v, but got: %v", tt.desired, a.result) + if !helper.IsScheduleResultEqual(a.Result, tt.desired) { + t.Errorf("expected result after takeByWeight: %v, but got: %v", tt.desired, a.Result) } }) } diff --git a/pkg/util/binding.go b/pkg/util/binding.go index 460715c12..fcb16fc92 100644 --- a/pkg/util/binding.go +++ b/pkg/util/binding.go @@ -55,36 +55,6 @@ func ConvertToClusterNames(clusters []workv1alpha2.TargetCluster) sets.String { return clusterNames } -// DivideReplicasByTargetCluster will divide the sum number by the weight of target clusters. -func DivideReplicasByTargetCluster(clusters []workv1alpha2.TargetCluster, sum int32) []workv1alpha2.TargetCluster { - res := make([]workv1alpha2.TargetCluster, len(clusters)) - if len(clusters) == 0 { - return res - } - sumWeight := int32(0) - allocatedReplicas := int32(0) - for i := range clusters { - sumWeight += clusters[i].Replicas - } - for i := range clusters { - res[i].Name = clusters[i].Name - if sumWeight > 0 { - res[i].Replicas = clusters[i].Replicas * sum / sumWeight - } - allocatedReplicas += res[i].Replicas - } - if remainReplicas := sum - allocatedReplicas; remainReplicas > 0 { - for i := 0; remainReplicas > 0; i++ { - if i == len(res) { - i = 0 - } - res[i].Replicas++ - remainReplicas-- - } - } - return res -} - // MergeTargetClusters will merge the replicas in two TargetCluster func MergeTargetClusters(old, new []workv1alpha2.TargetCluster) []workv1alpha2.TargetCluster { switch { diff --git a/pkg/util/binding_test.go b/pkg/util/binding_test.go index 0ba6e3635..44a970789 100644 --- a/pkg/util/binding_test.go +++ b/pkg/util/binding_test.go @@ -15,157 +15,8 @@ import ( const ( ClusterMember1 = "member1" ClusterMember2 = "member2" - ClusterMember3 = "member3" ) -func TestDivideReplicasByTargetCluster(t *testing.T) { - type args struct { - clusters []workv1alpha2.TargetCluster - sum int32 - } - tests := []struct { - name string - args args - want []workv1alpha2.TargetCluster - }{ - { - name: "empty clusters", - args: args{ - clusters: []workv1alpha2.TargetCluster{}, - sum: 10, - }, - want: []workv1alpha2.TargetCluster{}, - }, - { - name: "1 cluster, 5 replicas, 10 sum", - args: args{ - clusters: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 5, - }, - }, - sum: 10, - }, - want: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 10, - }, - }, - }, - { - name: "3 cluster, 1:1:1, 12 sum", - args: args{ - clusters: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 5, - }, - { - Name: ClusterMember2, - Replicas: 5, - }, - { - Name: ClusterMember3, - Replicas: 5, - }, - }, - sum: 12, - }, - want: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 4, - }, - { - Name: ClusterMember2, - Replicas: 4, - }, - { - Name: ClusterMember3, - Replicas: 4, - }, - }, - }, - { - name: "3 cluster, 1:1:1, 10 sum", - args: args{ - clusters: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 5, - }, - { - Name: ClusterMember2, - Replicas: 5, - }, - { - Name: ClusterMember3, - Replicas: 5, - }, - }, - sum: 10, - }, - want: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 4, - }, - { - Name: ClusterMember2, - Replicas: 3, - }, - { - Name: ClusterMember3, - Replicas: 3, - }, - }, - }, - { - name: "3 cluster, 1:2:3, 13 sum", - args: args{ - clusters: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 1, - }, - { - Name: ClusterMember2, - Replicas: 2, - }, - { - Name: ClusterMember3, - Replicas: 3, - }, - }, - sum: 13, - }, - want: []workv1alpha2.TargetCluster{ - { - Name: ClusterMember1, - Replicas: 3, - }, - { - Name: ClusterMember2, - Replicas: 4, - }, - { - Name: ClusterMember3, - Replicas: 6, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := DivideReplicasByTargetCluster(tt.args.clusters, tt.args.sum); !testhelper.IsScheduleResultEqual(got, tt.want) { - t.Errorf("DivideReplicasByTargetCluster() = %v, want %v", got, tt.want) - } - }) - } -} - func TestGetBindingClusterNames(t *testing.T) { tests := []struct { name string diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index 64c90909c..5ed57ed4d 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -68,6 +68,81 @@ func (p ClusterWeightInfoList) GetWeightSum() int64 { return res } +// Dispenser aims to divide replicas among clusters by different weights. +type Dispenser struct { + // Target replicas, should be a positive integer. + NumReplicas int32 + // Final result. + Result []workv1alpha2.TargetCluster +} + +// NewDispenser will construct a dispenser with target replicas and a prescribed initial result. +func NewDispenser(numReplicas int32, init []workv1alpha2.TargetCluster) *Dispenser { + cp := make([]workv1alpha2.TargetCluster, len(init)) + copy(cp, init) + return &Dispenser{NumReplicas: numReplicas, Result: cp} +} + +// Done indicates whether finish dispensing. +func (a *Dispenser) Done() bool { + return a.NumReplicas == 0 && len(a.Result) != 0 +} + +// TakeByWeight divide replicas by a weight list and merge the result into previous result. +func (a *Dispenser) TakeByWeight(w ClusterWeightInfoList) { + if a.Done() { + return + } + sum := w.GetWeightSum() + if sum == 0 { + return + } + + sort.Sort(w) + + result := make([]workv1alpha2.TargetCluster, 0, w.Len()) + remain := a.NumReplicas + for _, info := range w { + replicas := int32(info.Weight * int64(a.NumReplicas) / sum) + result = append(result, workv1alpha2.TargetCluster{ + Name: info.ClusterName, + Replicas: replicas, + }) + remain -= replicas + } + // TODO(Garrybest): take rest replicas by fraction part + for i := range result { + if remain == 0 { + break + } + result[i].Replicas++ + remain-- + } + + a.NumReplicas = remain + a.Result = util.MergeTargetClusters(a.Result, result) +} + +// GetStaticWeightInfoListByTargetClusters constructs a weight list by target cluster slice. +func GetStaticWeightInfoListByTargetClusters(tcs []workv1alpha2.TargetCluster) ClusterWeightInfoList { + weightList := make(ClusterWeightInfoList, 0, len(tcs)) + for _, result := range tcs { + weightList = append(weightList, ClusterWeightInfo{ + ClusterName: result.Name, + Weight: int64(result.Replicas), + }) + } + return weightList +} + +// SpreadReplicasByTargetClusters divides replicas by the weight of a target cluster list. +func SpreadReplicasByTargetClusters(numReplicas int32, tcs, init []workv1alpha2.TargetCluster) []workv1alpha2.TargetCluster { + weightList := GetStaticWeightInfoListByTargetClusters(tcs) + disp := NewDispenser(numReplicas, init) + disp.TakeByWeight(weightList) + return disp.Result +} + // IsBindingScheduled will check if resourceBinding/clusterResourceBinding is successfully scheduled. func IsBindingScheduled(status *workv1alpha2.ResourceBindingStatus) bool { return meta.IsStatusConditionTrue(status.Conditions, workv1alpha2.Scheduled) diff --git a/pkg/util/helper/binding_test.go b/pkg/util/helper/binding_test.go index bf4f86a12..f0621e654 100644 --- a/pkg/util/helper/binding_test.go +++ b/pkg/util/helper/binding_test.go @@ -24,8 +24,163 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/names" + testhelper "github.com/karmada-io/karmada/test/helper" ) +const ( + ClusterMember1 = "member1" + ClusterMember2 = "member2" + ClusterMember3 = "member3" +) + +func TestDispenseReplicasByTargetClusters(t *testing.T) { + type args struct { + clusters []workv1alpha2.TargetCluster + sum int32 + } + tests := []struct { + name string + args args + want []workv1alpha2.TargetCluster + }{ + { + name: "empty clusters", + args: args{ + clusters: []workv1alpha2.TargetCluster{}, + sum: 10, + }, + want: []workv1alpha2.TargetCluster{}, + }, + { + name: "1 cluster, 5 replicas, 10 sum", + args: args{ + clusters: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 5, + }, + }, + sum: 10, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 10, + }, + }, + }, + { + name: "3 cluster, 1:1:1, 12 sum", + args: args{ + clusters: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 5, + }, + { + Name: ClusterMember2, + Replicas: 5, + }, + { + Name: ClusterMember3, + Replicas: 5, + }, + }, + sum: 12, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 4, + }, + { + Name: ClusterMember2, + Replicas: 4, + }, + { + Name: ClusterMember3, + Replicas: 4, + }, + }, + }, + { + name: "3 cluster, 1:1:1, 10 sum", + args: args{ + clusters: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 5, + }, + { + Name: ClusterMember2, + Replicas: 5, + }, + { + Name: ClusterMember3, + Replicas: 5, + }, + }, + sum: 10, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 4, + }, + { + Name: ClusterMember2, + Replicas: 3, + }, + { + Name: ClusterMember3, + Replicas: 3, + }, + }, + }, + { + name: "3 cluster, 1:2:3, 13 sum", + args: args{ + clusters: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 1, + }, + { + Name: ClusterMember2, + Replicas: 2, + }, + { + Name: ClusterMember3, + Replicas: 3, + }, + }, + sum: 13, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 2, + }, + { + Name: ClusterMember2, + Replicas: 4, + }, + { + Name: ClusterMember3, + Replicas: 7, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SpreadReplicasByTargetClusters(tt.args.sum, tt.args.clusters, nil); !testhelper.IsScheduleResultEqual(got, tt.want) { + t.Errorf("SpreadReplicasByTargetClusters() = %v, want %v", got, tt.want) + } + }) + } +} + func TestHasScheduledReplica(t *testing.T) { tests := []struct { name string