diff --git a/charts/_crds/bases/policy.karmada.io_clusterpropagationpolicies.yaml b/charts/_crds/bases/policy.karmada.io_clusterpropagationpolicies.yaml index b33d8ddb9..7e6d3d433 100644 --- a/charts/_crds/bases/policy.karmada.io_clusterpropagationpolicies.yaml +++ b/charts/_crds/bases/policy.karmada.io_clusterpropagationpolicies.yaml @@ -384,8 +384,6 @@ spec: - weight type: object type: array - required: - - staticWeightList type: object type: object spreadConstraints: diff --git a/charts/_crds/bases/policy.karmada.io_propagationpolicies.yaml b/charts/_crds/bases/policy.karmada.io_propagationpolicies.yaml index 667d65a29..90b2bfc39 100644 --- a/charts/_crds/bases/policy.karmada.io_propagationpolicies.yaml +++ b/charts/_crds/bases/policy.karmada.io_propagationpolicies.yaml @@ -380,8 +380,6 @@ spec: - weight type: object type: array - required: - - staticWeightList type: object type: object spreadConstraints: diff --git a/charts/_crds/bases/policy.karmada.io_replicaschedulingpolicies.yaml b/charts/_crds/bases/policy.karmada.io_replicaschedulingpolicies.yaml index 6febc8419..27a7eff0e 100644 --- a/charts/_crds/bases/policy.karmada.io_replicaschedulingpolicies.yaml +++ b/charts/_crds/bases/policy.karmada.io_replicaschedulingpolicies.yaml @@ -173,8 +173,6 @@ spec: - weight type: object type: array - required: - - staticWeightList type: object resourceSelectors: description: ResourceSelectors used to select resources. diff --git a/pkg/apis/policy/v1alpha1/replicascheduling_types.go b/pkg/apis/policy/v1alpha1/replicascheduling_types.go index 164af0e4c..512818567 100644 --- a/pkg/apis/policy/v1alpha1/replicascheduling_types.go +++ b/pkg/apis/policy/v1alpha1/replicascheduling_types.go @@ -35,8 +35,8 @@ type ReplicaSchedulingSpec struct { // ClusterPreferences describes weight for each cluster or for each group of cluster. type ClusterPreferences struct { // StaticWeightList defines the static cluster weight. - // +required - StaticWeightList []StaticClusterWeight `json:"staticWeightList"` + // +optional + StaticWeightList []StaticClusterWeight `json:"staticWeightList,omitempty"` // DynamicWeight specifies the factor to generates dynamic weight list. // If specified, StaticWeightList will be ignored. // +kubebuilder:validation:Enum=AvailableReplicas diff --git a/pkg/scheduler/core/division_algorithm.go b/pkg/scheduler/core/division_algorithm.go new file mode 100644 index 000000000..2e3a379b9 --- /dev/null +++ b/pkg/scheduler/core/division_algorithm.go @@ -0,0 +1,206 @@ +package core + +import ( + "fmt" + "sort" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" +) + +// TargetClustersList is a slice of TargetCluster that implements sort.Interface to sort by Value. +type TargetClustersList []workv1alpha2.TargetCluster + +func (a TargetClustersList) Len() int { return len(a) } +func (a TargetClustersList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a TargetClustersList) Less(i, j int) bool { return a[i].Replicas > a[j].Replicas } + +// divideReplicasByDynamicWeight assigns a total number of replicas to the selected clusters by the dynamic weight list. +func divideReplicasByDynamicWeight(clusters []*clusterv1alpha1.Cluster, dynamicWeight policyv1alpha1.DynamicWeightFactor, spec *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { + switch dynamicWeight { + case policyv1alpha1.DynamicWeightByAvailableReplicas: + return divideReplicasByResource(clusters, spec, policyv1alpha1.ReplicaDivisionPreferenceWeighted) + default: + return nil, fmt.Errorf("undefined replica dynamic weight factor: %s", dynamicWeight) + } +} + +func divideReplicasByResource(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec, + preference policyv1alpha1.ReplicaDivisionPreference, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) { + clusterAvailableReplicas := calAvailableReplicas(clusters, spec) + sort.Sort(TargetClustersList(clusterAvailableReplicas)) + return divideReplicasByPreference(clusterAvailableReplicas, spec.Replicas, preference, preUsedClustersName...) +} + +// divideReplicasByStaticWeight assigns a total number of replicas to the selected clusters by the weight list. +func divideReplicasByStaticWeight(clusters []*clusterv1alpha1.Cluster, weightList []policyv1alpha1.StaticClusterWeight, + replicas int32) ([]workv1alpha2.TargetCluster, error) { + weightSum := int64(0) + matchClusters := make(map[string]int64) + desireReplicaInfos := make(map[string]int64) + + for _, cluster := range clusters { + for _, staticWeightRule := range weightList { + if util.ClusterMatches(cluster, staticWeightRule.TargetCluster) { + weightSum += staticWeightRule.Weight + matchClusters[cluster.Name] = staticWeightRule.Weight + break + } + } + } + + if weightSum == 0 { + for _, cluster := range clusters { + weightSum++ + matchClusters[cluster.Name] = 1 + } + } + + allocatedReplicas := int32(0) + for clusterName, weight := range matchClusters { + desireReplicaInfos[clusterName] = weight * int64(replicas) / weightSum + allocatedReplicas += int32(desireReplicaInfos[clusterName]) + } + + if remainReplicas := replicas - allocatedReplicas; remainReplicas > 0 { + sortedClusters := helper.SortClusterByWeight(matchClusters) + for i := 0; remainReplicas > 0; i++ { + desireReplicaInfos[sortedClusters[i].ClusterName]++ + remainReplicas-- + if i == len(desireReplicaInfos) { + i = 0 + } + } + } + + for _, cluster := range clusters { + if _, exist := matchClusters[cluster.Name]; !exist { + desireReplicaInfos[cluster.Name] = 0 + } + } + + targetClusters := make([]workv1alpha2.TargetCluster, len(desireReplicaInfos)) + i := 0 + for key, value := range desireReplicaInfos { + targetClusters[i] = workv1alpha2.TargetCluster{Name: key, Replicas: int32(value)} + i++ + } + return targetClusters, nil +} + +// divideReplicasByPreference assigns a total number of replicas to the selected clusters by preference according to the resource. +func divideReplicasByPreference(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32, + preference policyv1alpha1.ReplicaDivisionPreference, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) { + clustersMaxReplicas := util.GetSumOfReplicas(clusterAvailableReplicas) + if clustersMaxReplicas < replicas { + return nil, fmt.Errorf("clusters resources are not enough to schedule, max %d replicas are support", clustersMaxReplicas) + } + + switch preference { + case policyv1alpha1.ReplicaDivisionPreferenceAggregated: + return divideReplicasByAggregation(clusterAvailableReplicas, replicas, preUsedClustersName...), nil + case policyv1alpha1.ReplicaDivisionPreferenceWeighted: + return divideReplicasByAvailableReplica(clusterAvailableReplicas, replicas, clustersMaxReplicas), nil + default: + return nil, fmt.Errorf("undefined replicaSchedulingType: %v", preference) + } +} + +func divideReplicasByAggregation(clusterAvailableReplicas []workv1alpha2.TargetCluster, + replicas int32, preUsedClustersName ...string) []workv1alpha2.TargetCluster { + clusterAvailableReplicas = presortClusterList(clusterAvailableReplicas, preUsedClustersName...) + clustersNum, clustersMaxReplicas := 0, int32(0) + for _, clusterInfo := range clusterAvailableReplicas { + clustersNum++ + clustersMaxReplicas += clusterInfo.Replicas + if clustersMaxReplicas >= replicas { + break + } + } + var unusedClusters []string + for i := clustersNum; i < len(clusterAvailableReplicas); i++ { + unusedClusters = append(unusedClusters, clusterAvailableReplicas[i].Name) + } + return divideReplicasByAvailableReplica(clusterAvailableReplicas[0:clustersNum], replicas, clustersMaxReplicas, unusedClusters...) +} + +func divideReplicasByAvailableReplica(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32, + clustersMaxReplicas int32, unusedClusters ...string) []workv1alpha2.TargetCluster { + desireReplicaInfos := make(map[string]int32) + allocatedReplicas := int32(0) + for _, clusterInfo := range clusterAvailableReplicas { + desireReplicaInfos[clusterInfo.Name] = clusterInfo.Replicas * replicas / clustersMaxReplicas + allocatedReplicas += desireReplicaInfos[clusterInfo.Name] + } + + if remainReplicas := replicas - allocatedReplicas; remainReplicas > 0 { + for i := 0; remainReplicas > 0; i++ { + desireReplicaInfos[clusterAvailableReplicas[i].Name]++ + remainReplicas-- + if i == len(desireReplicaInfos) { + i = 0 + } + } + } + + // For scaling up + for _, cluster := range unusedClusters { + if _, exist := desireReplicaInfos[cluster]; !exist { + desireReplicaInfos[cluster] = 0 + } + } + + targetClusters := make([]workv1alpha2.TargetCluster, len(desireReplicaInfos)) + i := 0 + for key, value := range desireReplicaInfos { + targetClusters[i] = workv1alpha2.TargetCluster{Name: key, Replicas: value} + i++ + } + return targetClusters +} + +func scaleScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, preference policyv1alpha1.ReplicaDivisionPreference, + preSelectedClusters []*clusterv1alpha1.Cluster) ([]workv1alpha2.TargetCluster, error) { + assignedReplicas := util.GetSumOfReplicas(spec.Clusters) + if assignedReplicas > spec.Replicas { + newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference) + if err != nil { + return nil, fmt.Errorf("failed to scaleDown: %v", err) + } + return newTargetClusters, nil + } else if assignedReplicas < spec.Replicas { + newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(spec, preSelectedClusters, preference, assignedReplicas) + if err != nil { + return nil, fmt.Errorf("failed to scaleUp: %v", err) + } + return newTargetClusters, nil + } else { + return spec.Clusters, nil + } +} + +func scaleDownScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, + preference policyv1alpha1.ReplicaDivisionPreference) ([]workv1alpha2.TargetCluster, error) { + return divideReplicasByPreference(spec.Clusters, spec.Replicas, preference) +} + +func scaleUpScheduleByReplicaDivisionPreference(spec *workv1alpha2.ResourceBindingSpec, preSelectedClusters []*clusterv1alpha1.Cluster, + preference policyv1alpha1.ReplicaDivisionPreference, assignedReplicas int32) ([]workv1alpha2.TargetCluster, error) { + // Find the clusters that have old replicas, so we can prefer to assign new replicas to them. + usedTargetClusters := helper.GetUsedBindingClusterNames(spec.Clusters) + // only the new replicas are considered during this scheduler, the old replicas will not be moved. + // if not the old replicas may be recreated which is not expected during scaling up + // use usedTargetClusters to make sure that we assign new replicas to them preferentially + // so that all the replicas are aggregated + newObject := spec.DeepCopy() + newObject.Replicas = spec.Replicas - assignedReplicas + result, err := divideReplicasByResource(preSelectedClusters, newObject, preference, usedTargetClusters...) + if err != nil { + return result, err + } + // merge the result of this scheduler for new replicas and the data of old replicas + return util.MergeTargetClusters(spec.Clusters, result), nil +} diff --git a/pkg/scheduler/core/division_algorithm_test.go b/pkg/scheduler/core/division_algorithm_test.go new file mode 100644 index 000000000..c45887ec8 --- /dev/null +++ b/pkg/scheduler/core/division_algorithm_test.go @@ -0,0 +1,715 @@ +package core + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/test/helper" +) + +const ( + ClusterMember1 = "member1" + ClusterMember2 = "member2" + ClusterMember3 = "member3" +) + +func Test_divideReplicasByStaticWeight(t *testing.T) { + type args struct { + clusters []*clusterv1alpha1.Cluster + weightList []policyv1alpha1.StaticClusterWeight + replicas int32 + } + tests := []struct { + name string + args args + want []workv1alpha2.TargetCluster + wantErr bool + }{ + { + name: "replica 12, weight 3:2:1", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewCluster(ClusterMember1), + helper.NewCluster(ClusterMember2), + helper.NewCluster(ClusterMember3), + }, + weightList: []policyv1alpha1.StaticClusterWeight{ + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{ClusterMember1}, + }, + Weight: 3, + }, + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{ClusterMember2}, + }, + Weight: 2, + }, + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{ClusterMember3}, + }, + Weight: 1, + }, + }, + replicas: 12, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 6, + }, + { + Name: ClusterMember2, + Replicas: 4, + }, + { + Name: ClusterMember3, + Replicas: 2, + }, + }, + wantErr: false, + }, + { + name: "replica 12, default weight", + args: struct { + clusters []*clusterv1alpha1.Cluster + weightList []policyv1alpha1.StaticClusterWeight + replicas int32 + }{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewCluster(ClusterMember1), + helper.NewCluster(ClusterMember2), + helper.NewCluster(ClusterMember3), + }, + replicas: 12, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 4, + }, + { + Name: ClusterMember2, + Replicas: 4, + }, + { + Name: ClusterMember3, + Replicas: 4, + }, + }, + wantErr: false, + }, + { + name: "replica 14, weight 3:2:1", + args: struct { + clusters []*clusterv1alpha1.Cluster + weightList []policyv1alpha1.StaticClusterWeight + replicas int32 + }{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewCluster(ClusterMember1), + helper.NewCluster(ClusterMember2), + helper.NewCluster(ClusterMember3), + }, + weightList: []policyv1alpha1.StaticClusterWeight{ + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{ClusterMember1}, + }, + Weight: 3, + }, + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{ClusterMember2}, + }, + Weight: 2, + }, + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{ClusterMember3}, + }, + Weight: 1, + }, + }, + replicas: 14, + }, + want: []workv1alpha2.TargetCluster{ + { + Name: ClusterMember1, + Replicas: 8, + }, + { + Name: ClusterMember2, + Replicas: 4, + }, + { + Name: ClusterMember3, + Replicas: 2, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := divideReplicasByStaticWeight(tt.args.clusters, tt.args.weightList, tt.args.replicas) + if (err != nil) != tt.wantErr { + t.Errorf("divideReplicasByStaticWeight() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !helper.IsScheduleResultEqual(got, tt.want) { + t.Errorf("divideReplicasByStaticWeight() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_divideReplicasByPreference(t *testing.T) { + type args struct { + clusterAvailableReplicas []workv1alpha2.TargetCluster + replicas int32 + clustersMaxReplicas int32 + preference policyv1alpha1.ReplicaDivisionPreference + preUsedClustersName []string + } + tests := []struct { + name string + args args + want []workv1alpha2.TargetCluster + wantErr bool + }{ + { + name: "replica 12, dynamic weight 18:12:6", + args: args{ + clusterAvailableReplicas: TargetClustersList{ + workv1alpha2.TargetCluster{Name: ClusterMember1, Replicas: 18}, + workv1alpha2.TargetCluster{Name: ClusterMember2, Replicas: 12}, + workv1alpha2.TargetCluster{Name: ClusterMember3, Replicas: 6}, + }, + replicas: 12, + clustersMaxReplicas: 36, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + preUsedClustersName: nil, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 6}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 2}, + }, + wantErr: false, + }, + { + name: "replica 12, dynamic weight 20:12:6", + args: args{ + clusterAvailableReplicas: TargetClustersList{ + workv1alpha2.TargetCluster{Name: ClusterMember1, Replicas: 20}, + workv1alpha2.TargetCluster{Name: ClusterMember2, Replicas: 12}, + workv1alpha2.TargetCluster{Name: ClusterMember3, Replicas: 6}, + }, + replicas: 12, + clustersMaxReplicas: 38, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + preUsedClustersName: nil, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 7}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 1}, + }, + wantErr: false, + }, + { + name: "replica 12, dynamic weight 6:12:6", + args: args{ + clusterAvailableReplicas: TargetClustersList{ + workv1alpha2.TargetCluster{Name: ClusterMember1, Replicas: 6}, + workv1alpha2.TargetCluster{Name: ClusterMember2, Replicas: 12}, + workv1alpha2.TargetCluster{Name: ClusterMember3, Replicas: 6}, + }, + replicas: 12, + clustersMaxReplicas: 24, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + preUsedClustersName: nil, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 3}, + {Name: ClusterMember2, Replicas: 6}, + {Name: ClusterMember3, Replicas: 3}, + }, + wantErr: false, + }, + { + name: "replica 12, aggregated 12:6:6", + args: args{ + clusterAvailableReplicas: TargetClustersList{ + workv1alpha2.TargetCluster{Name: ClusterMember2, Replicas: 12}, + workv1alpha2.TargetCluster{Name: ClusterMember1, Replicas: 6}, + workv1alpha2.TargetCluster{Name: ClusterMember3, Replicas: 6}, + }, + replicas: 12, + clustersMaxReplicas: 24, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + preUsedClustersName: nil, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 0}, + {Name: ClusterMember2, Replicas: 12}, + {Name: ClusterMember3, Replicas: 0}, + }, + wantErr: false, + }, + { + name: "replica 12, aggregated 6:6:6", + args: args{ + clusterAvailableReplicas: TargetClustersList{ + workv1alpha2.TargetCluster{Name: ClusterMember1, Replicas: 6}, + workv1alpha2.TargetCluster{Name: ClusterMember2, Replicas: 6}, + workv1alpha2.TargetCluster{Name: ClusterMember3, Replicas: 6}, + }, + replicas: 12, + clustersMaxReplicas: 18, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + preUsedClustersName: nil, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 6}, + {Name: ClusterMember2, Replicas: 6}, + {Name: ClusterMember3, Replicas: 0}, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := divideReplicasByPreference(tt.args.clusterAvailableReplicas, tt.args.replicas, tt.args.preference, tt.args.preUsedClustersName...) + if (err != nil) != tt.wantErr { + t.Errorf("divideReplicasByPreference() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !helper.IsScheduleResultEqual(got, tt.want) { + t.Errorf("divideReplicasByPreference() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_divideReplicasByResource(t *testing.T) { + type args struct { + clusters []*clusterv1alpha1.Cluster + spec *workv1alpha2.ResourceBindingSpec + preference policyv1alpha1.ReplicaDivisionPreference + preUsedClustersName []string + } + tests := []struct { + name string + args args + want []workv1alpha2.TargetCluster + wantErr bool + }{ + { + name: "replica 12, dynamic weight 6:8:10", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(6, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 12, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 3}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 5}, + }, + wantErr: false, + }, + { + name: "replica 12, dynamic weight 8:8:10", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 12, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 4}, + {Name: ClusterMember2, Replicas: 3}, + {Name: ClusterMember3, Replicas: 5}, + }, + wantErr: false, + }, + { + name: "replica 12, dynamic weight 3:3:3", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(3, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(3, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(3, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 12, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + }, + wantErr: true, + }, + { + name: "replica 12, aggregated 6:8:10", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(6, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 12, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 0}, + {Name: ClusterMember2, Replicas: 5}, + {Name: ClusterMember3, Replicas: 7}, + }, + wantErr: false, + }, + { + name: "replica 12, aggregated 12:8:10", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(12, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 12, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 12}, + {Name: ClusterMember2, Replicas: 0}, + {Name: ClusterMember3, Replicas: 0}, + }, + wantErr: false, + }, + { + name: "replica 12, aggregated 3:3:3", + args: args{ + clusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(3, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(3, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(3, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 12, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := divideReplicasByResource(tt.args.clusters, tt.args.spec, tt.args.preference, tt.args.preUsedClustersName...) + if (err != nil) != tt.wantErr { + t.Errorf("divideReplicasByResource() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !helper.IsScheduleResultEqual(got, tt.want) { + t.Errorf("divideReplicasByResource() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_scaleScheduleByReplicaDivisionPreference(t *testing.T) { + type args struct { + spec *workv1alpha2.ResourceBindingSpec + preference policyv1alpha1.ReplicaDivisionPreference + preSelectedClusters []*clusterv1alpha1.Cluster + } + tests := []struct { + name string + args args + want []workv1alpha2.TargetCluster + wantErr bool + }{ + { + name: "replica 12 -> 6, dynamic weighted 2:4:6", + args: args{ + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 6, + Clusters: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 2}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 6}, + }, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + preSelectedClusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 1}, + {Name: ClusterMember2, Replicas: 2}, + {Name: ClusterMember3, Replicas: 3}, + }, + wantErr: false, + }, + { + name: "replica 12 -> 24, dynamic weighted 10:10:10", + args: args{ + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 24, + Clusters: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 2}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 6}, + }, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + preSelectedClusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 6}, + {Name: ClusterMember2, Replicas: 8}, + {Name: ClusterMember3, Replicas: 10}, + }, + wantErr: false, + }, + { + name: "replica 12 -> 24, dynamic weighted 1:1:1", + args: args{ + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 24, + Clusters: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 2}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 6}, + }, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + preSelectedClusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + }, + wantErr: true, + }, + { + name: "replica 12 -> 6, aggregated 2:4:6", + args: args{ + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 6, + Clusters: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 4}, + {Name: ClusterMember2, Replicas: 8}, + {Name: ClusterMember3, Replicas: 0}, + }, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + preSelectedClusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 2}, + {Name: ClusterMember2, Replicas: 4}, + {Name: ClusterMember3, Replicas: 0}, + }, + wantErr: false, + }, + { + name: "replica 12 -> 24, aggregated 4:6:8", + args: args{ + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 24, + Clusters: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 4}, + {Name: ClusterMember2, Replicas: 8}, + {Name: ClusterMember3, Replicas: 0}, + }, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + preSelectedClusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(4, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(6, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(14, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + }, + want: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 6}, + {Name: ClusterMember2, Replicas: 11}, + {Name: ClusterMember3, Replicas: 7}, + }, + wantErr: false, + }, + { + name: "replica 12 -> 24, dynamic weighted 1:1:1", + args: args{ + spec: &workv1alpha2.ResourceBindingSpec{ + ReplicaRequirements: &workv1alpha2.ReplicaRequirements{ + ResourceRequest: util.EmptyResource().ResourceList(), + }, + Replicas: 24, + Clusters: []workv1alpha2.TargetCluster{ + {Name: ClusterMember1, Replicas: 4}, + {Name: ClusterMember2, Replicas: 8}, + {Name: ClusterMember3, Replicas: 0}, + }, + }, + preference: policyv1alpha1.ReplicaDivisionPreferenceAggregated, + preSelectedClusters: []*clusterv1alpha1.Cluster{ + helper.NewClusterWithResource(ClusterMember1, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember2, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + helper.NewClusterWithResource(ClusterMember3, corev1.ResourceList{ + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, util.EmptyResource().ResourceList(), util.EmptyResource().ResourceList()), + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := scaleScheduleByReplicaDivisionPreference(tt.args.spec, tt.args.preference, tt.args.preSelectedClusters) + if (err != nil) != tt.wantErr { + t.Errorf("scaleScheduleByReplicaDivisionPreference() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !helper.IsScheduleResultEqual(got, tt.want) { + t.Errorf("scaleScheduleByReplicaDivisionPreference() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index d7e52eefc..ac97fddb1 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -3,8 +3,6 @@ package core import ( "context" "fmt" - "math" - "sort" "time" "k8s.io/apimachinery/pkg/util/sets" @@ -13,14 +11,12 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" - estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" lister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/scheduler/cache" "github.com/karmada-io/karmada/pkg/scheduler/framework" "github.com/karmada-io/karmada/pkg/scheduler/framework/runtime" "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" - "github.com/karmada-io/karmada/pkg/util/helper" ) // ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters. @@ -215,26 +211,31 @@ func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, r targetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) if object.Replicas > 0 && replicaSchedulingStrategy != nil { - if replicaSchedulingStrategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + switch replicaSchedulingStrategy.ReplicaSchedulingType { + case policyv1alpha1.ReplicaSchedulingTypeDuplicated: for i, cluster := range clusters { targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name, Replicas: object.Replicas} } return targetClusters, nil - } - if replicaSchedulingStrategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided { - if replicaSchedulingStrategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted { + case policyv1alpha1.ReplicaSchedulingTypeDivided: + switch replicaSchedulingStrategy.ReplicaDivisionPreference { + case policyv1alpha1.ReplicaDivisionPreferenceWeighted: + // If ReplicaDivisionPreference is set to "Weighted" and WeightPreference is not set, + // scheduler will weight all clusters averagely. if replicaSchedulingStrategy.WeightPreference == nil { - // if ReplicaDivisionPreference is set to "Weighted" and WeightPreference is not set, scheduler will weight all clusters the same. replicaSchedulingStrategy.WeightPreference = getDefaultWeightPreference(clusters) } - return g.divideReplicasByStaticWeight(clusters, replicaSchedulingStrategy.WeightPreference.StaticWeightList, object.Replicas) + if len(replicaSchedulingStrategy.WeightPreference.DynamicWeight) != 0 { + return divideReplicasByDynamicWeight(clusters, replicaSchedulingStrategy.WeightPreference.DynamicWeight, object) + } + return divideReplicasByStaticWeight(clusters, replicaSchedulingStrategy.WeightPreference.StaticWeightList, object.Replicas) + case policyv1alpha1.ReplicaDivisionPreferenceAggregated: + return divideReplicasByResource(clusters, object, policyv1alpha1.ReplicaDivisionPreferenceAggregated) + default: + return nil, fmt.Errorf("undefined replica division preference: %s", replicaSchedulingStrategy.ReplicaDivisionPreference) } - if replicaSchedulingStrategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceAggregated { - return g.divideReplicasAggregatedWithResource(clusters, object) - } - - // will never reach here, only "Aggregated" and "Weighted" are support - return nil, nil + default: + return nil, fmt.Errorf("undefined replica scheduling type: %s", replicaSchedulingStrategy.ReplicaSchedulingType) } } @@ -244,315 +245,56 @@ func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, r return targetClusters, nil } -func getDefaultWeightPreference(clusters []*clusterv1alpha1.Cluster) *policyv1alpha1.ClusterPreferences { - staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0) - for _, cluster := range clusters { - staticWeightList := policyv1alpha1.StaticClusterWeight{ - TargetCluster: policyv1alpha1.ClusterAffinity{ - ClusterNames: []string{cluster.Name}, - }, - Weight: 1, - } - staticWeightLists = append(staticWeightLists, staticWeightList) - } - - return &policyv1alpha1.ClusterPreferences{ - StaticWeightList: staticWeightLists, - } -} - -// divideReplicasByStaticWeight assigns a total number of replicas to the selected clusters by the weight list. -func (g *genericScheduler) divideReplicasByStaticWeight(clusters []*clusterv1alpha1.Cluster, staticWeightList []policyv1alpha1.StaticClusterWeight, replicas int32) ([]workv1alpha2.TargetCluster, error) { - weightSum := int64(0) - matchClusters := make(map[string]int64) - desireReplicaInfos := make(map[string]int64) - - for _, cluster := range clusters { - for _, staticWeightRule := range staticWeightList { - if util.ClusterMatches(cluster, staticWeightRule.TargetCluster) { - weightSum += staticWeightRule.Weight - matchClusters[cluster.Name] = staticWeightRule.Weight - break - } - } - } - - if weightSum == 0 { - for _, cluster := range clusters { - weightSum++ - matchClusters[cluster.Name] = 1 - } - } - - allocatedReplicas := int32(0) - for clusterName, weight := range matchClusters { - desireReplicaInfos[clusterName] = weight * int64(replicas) / weightSum - allocatedReplicas += int32(desireReplicaInfos[clusterName]) - } - - if remainReplicas := replicas - allocatedReplicas; remainReplicas > 0 { - sortedClusters := helper.SortClusterByWeight(matchClusters) - for i := 0; remainReplicas > 0; i++ { - desireReplicaInfos[sortedClusters[i].ClusterName]++ - remainReplicas-- - if i == len(desireReplicaInfos) { - i = 0 - } - } - } - - for _, cluster := range clusters { - if _, exist := matchClusters[cluster.Name]; !exist { - desireReplicaInfos[cluster.Name] = 0 - } - } - - targetClusters := make([]workv1alpha2.TargetCluster, len(desireReplicaInfos)) - i := 0 - for key, value := range desireReplicaInfos { - targetClusters[i] = workv1alpha2.TargetCluster{Name: key, Replicas: int32(value)} - i++ - } - return targetClusters, nil -} - -// TargetClustersList is a slice of TargetCluster that implements sort.Interface to sort by Value. -type TargetClustersList []workv1alpha2.TargetCluster - -func (a TargetClustersList) Len() int { return len(a) } -func (a TargetClustersList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a TargetClustersList) Less(i, j int) bool { return a[i].Replicas > a[j].Replicas } - -func (g *genericScheduler) divideReplicasAggregatedWithResource(clusters []*clusterv1alpha1.Cluster, - spec *workv1alpha2.ResourceBindingSpec, preUsedClustersName ...string) ([]workv1alpha2.TargetCluster, error) { - // make sure preUsedClusters are in front of the unUsedClusters in the list of clusterAvailableReplicas - // so that we can assign new replicas to them preferentially when scale up. - // preUsedClusters have none items during first scheduler - preUsedClusters, unUsedClusters := g.getPreUsed(clusters, preUsedClustersName...) - preUsedClustersAvailableReplicas := g.calAvailableReplicas(preUsedClusters, spec) - unUsedClustersAvailableReplicas := g.calAvailableReplicas(unUsedClusters, spec) - clusterAvailableReplicas := append(preUsedClustersAvailableReplicas, unUsedClustersAvailableReplicas...) - return g.divideReplicasAggregatedWithClusterReplicas(clusterAvailableReplicas, spec.Replicas) -} - -func (g *genericScheduler) calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster { - availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) - - // Set the boundary. - for i := range availableTargetClusters { - availableTargetClusters[i].Name = clusters[i].Name - availableTargetClusters[i].Replicas = math.MaxInt32 - } - - // Get the minimum value of MaxAvailableReplicas in terms of all estimators. - estimators := estimatorclient.GetReplicaEstimators() - ctx := context.WithValue(context.TODO(), util.ContextKeyObject, - fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name)) - for _, estimator := range estimators { - res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements) - if err != nil { - klog.Errorf("Max cluster available replicas error: %v", err) - continue - } - for i := range res { - if res[i].Replicas == estimatorclient.UnauthenticReplica { - continue - } - if availableTargetClusters[i].Name == res[i].Name && availableTargetClusters[i].Replicas > res[i].Replicas { - availableTargetClusters[i].Replicas = res[i].Replicas - } - } - } - - // In most cases, the target cluster max available replicas should not be MaxInt32 unless the workload is best-effort - // and the scheduler-estimator has not been enabled. So we set the replicas to spec.Replicas for avoiding overflow. - for i := range availableTargetClusters { - if availableTargetClusters[i].Replicas == math.MaxInt32 { - availableTargetClusters[i].Replicas = spec.Replicas - } - } - - sort.Sort(TargetClustersList(availableTargetClusters)) - klog.V(4).Infof("Target cluster: %v", availableTargetClusters) - return availableTargetClusters -} - -func (g *genericScheduler) divideReplicasAggregatedWithClusterReplicas(clusterAvailableReplicas []workv1alpha2.TargetCluster, replicas int32) ([]workv1alpha2.TargetCluster, error) { - clustersNum := 0 - clustersMaxReplicas := int32(0) - for _, clusterInfo := range clusterAvailableReplicas { - clustersNum++ - clustersMaxReplicas += clusterInfo.Replicas - if clustersMaxReplicas >= replicas { - break - } - } - if clustersMaxReplicas < replicas { - return nil, fmt.Errorf("clusters resources are not enough to schedule, max %v replicas are support", clustersMaxReplicas) - } - - desireReplicaInfos := make(map[string]int32) - allocatedReplicas := int32(0) - for i, clusterInfo := range clusterAvailableReplicas { - if i >= clustersNum { - desireReplicaInfos[clusterInfo.Name] = 0 - continue - } - desireReplicaInfos[clusterInfo.Name] = clusterInfo.Replicas * replicas / clustersMaxReplicas - allocatedReplicas += desireReplicaInfos[clusterInfo.Name] - } - - if remainReplicas := replicas - allocatedReplicas; remainReplicas > 0 { - for i := 0; remainReplicas > 0; i++ { - desireReplicaInfos[clusterAvailableReplicas[i].Name]++ - remainReplicas-- - if i == clustersNum { - i = 0 - } - } - } - - targetClusters := make([]workv1alpha2.TargetCluster, len(clusterAvailableReplicas)) - i := 0 - for key, value := range desireReplicaInfos { - targetClusters[i] = workv1alpha2.TargetCluster{Name: key, Replicas: value} - i++ - } - return targetClusters, nil -} - func (g *genericScheduler) ScaleSchedule(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) { + clustersWithReplicas, err := g.assignScaleReplicas(ctx, placement, spec) + if err != nil { + return result, fmt.Errorf("failed to assignReplicas: %v", err) + } + result.SuggestedClusters = clustersWithReplicas + + return result, nil +} + +func (g *genericScheduler) assignScaleReplicas(ctx context.Context, placement *policyv1alpha1.Placement, + spec *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { newTargetClusters := make([]workv1alpha2.TargetCluster, len(spec.Clusters)) + strategy := placement.ReplicaScheduling if spec.Replicas > 0 { - if placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + switch strategy.ReplicaSchedulingType { + case policyv1alpha1.ReplicaSchedulingTypeDuplicated: for i, cluster := range spec.Clusters { newTargetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name, Replicas: spec.Replicas} } - result.SuggestedClusters = newTargetClusters - return result, nil - } - if placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided { - if placement.ReplicaScheduling.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted { - preSelectedClusters := g.getPreSelected(spec.Clusters) - if placement.ReplicaScheduling.WeightPreference == nil { + return newTargetClusters, nil + case policyv1alpha1.ReplicaSchedulingTypeDivided: + switch strategy.ReplicaDivisionPreference { + case policyv1alpha1.ReplicaDivisionPreferenceWeighted: + preSelectedClusters := getPreSelected(spec.Clusters, g.schedulerCache) + if strategy.WeightPreference == nil { // if ReplicaDivisionPreference is set to "Weighted" and WeightPreference is not set, scheduler will weight all clusters the same. - placement.ReplicaScheduling.WeightPreference = getDefaultWeightPreference(preSelectedClusters) + strategy.WeightPreference = getDefaultWeightPreference(preSelectedClusters) } - clustersWithReplicase, err := g.divideReplicasByStaticWeight(preSelectedClusters, placement.ReplicaScheduling.WeightPreference.StaticWeightList, spec.Replicas) - if err != nil { - return result, fmt.Errorf("failed to assignReplicas with Weight: %v", err) + if len(strategy.WeightPreference.DynamicWeight) != 0 { + return scaleScheduleByReplicaDivisionPreference(spec, strategy.ReplicaDivisionPreference, preSelectedClusters) } - result.SuggestedClusters = clustersWithReplicase - return result, nil + return divideReplicasByStaticWeight(preSelectedClusters, strategy.WeightPreference.StaticWeightList, spec.Replicas) + case policyv1alpha1.ReplicaDivisionPreferenceAggregated: + preSelectedClusters := getPreSelected(spec.Clusters, g.schedulerCache) + return scaleScheduleByReplicaDivisionPreference(spec, strategy.ReplicaDivisionPreference, preSelectedClusters) + default: + return nil, fmt.Errorf("undefined replica division preference: %s", strategy.ReplicaDivisionPreference) } - if placement.ReplicaScheduling.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceAggregated { - return g.scaleScheduleWithReplicaDivisionPreferenceAggregated(spec) - } - - // will never reach here, only "Aggregated" and "Weighted" are support - return result, nil + default: + return nil, fmt.Errorf("undefined replica scheduling type: %s", strategy.ReplicaSchedulingType) } } for i, cluster := range spec.Clusters { newTargetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name} } - result.SuggestedClusters = newTargetClusters - return result, nil -} - -func (g *genericScheduler) scaleScheduleWithReplicaDivisionPreferenceAggregated(spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) { - assignedReplicas := util.GetSumOfReplicas(spec.Clusters) - if assignedReplicas > spec.Replicas { - newTargetClusters, err := g.scaleDownScheduleWithReplicaDivisionPreferenceAggregated(spec) - if err != nil { - return result, fmt.Errorf("failed to scaleDown: %v", err) - } - result.SuggestedClusters = newTargetClusters - } else if assignedReplicas < spec.Replicas { - newTargetClusters, err := g.scaleUpScheduleWithReplicaDivisionPreferenceAggregated(spec) - if err != nil { - return result, fmt.Errorf("failed to scaleUp: %v", err) - } - result.SuggestedClusters = newTargetClusters - } else { - result.SuggestedClusters = spec.Clusters - } - return result, nil -} - -func (g *genericScheduler) scaleDownScheduleWithReplicaDivisionPreferenceAggregated(spec *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { - return g.divideReplicasAggregatedWithClusterReplicas(spec.Clusters, spec.Replicas) -} - -func (g *genericScheduler) scaleUpScheduleWithReplicaDivisionPreferenceAggregated(spec *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { - // find the clusters that have old replicas so we can assign new replicas to them preferentially - // targetMap map of the result for the old replicas so that it can be merged with the new result easily - targetMap := make(map[string]int32) - usedTargetClusters := make([]string, 0) - assignedReplicas := int32(0) - for _, cluster := range spec.Clusters { - targetMap[cluster.Name] = cluster.Replicas - assignedReplicas += cluster.Replicas - if cluster.Replicas > 0 { - usedTargetClusters = append(usedTargetClusters, cluster.Name) - } - } - preSelected := g.getPreSelected(spec.Clusters) - // only the new replicas are considered during this scheduler, the old replicas will not be moved. - // if not the old replicas may be recreated which is not expected during scaling up - // use usedTargetClusters to make sure that we assign new replicas to them preferentially so that all the replicas are aggregated - newObject := spec.DeepCopy() - newObject.Replicas = spec.Replicas - assignedReplicas - result, err := g.divideReplicasAggregatedWithResource(preSelected, newObject, usedTargetClusters...) - if err != nil { - return result, err - } - // merge the result of this scheduler for new replicas and the data of old replicas - for i, cluster := range result { - value, ok := targetMap[cluster.Name] - if ok { - result[i].Replicas = cluster.Replicas + value - delete(targetMap, cluster.Name) - } - } - for key, value := range targetMap { - result = append(result, workv1alpha2.TargetCluster{Name: key, Replicas: value}) - } - return result, nil -} - -func (g *genericScheduler) getPreSelected(targetClusters []workv1alpha2.TargetCluster) []*clusterv1alpha1.Cluster { - var preSelectedClusters []*clusterv1alpha1.Cluster - clusterInfoSnapshot := g.schedulerCache.Snapshot() - for _, targetCluster := range targetClusters { - for _, cluster := range clusterInfoSnapshot.GetClusters() { - if targetCluster.Name == cluster.Cluster().Name { - preSelectedClusters = append(preSelectedClusters, cluster.Cluster()) - break - } - } - } - return preSelectedClusters -} - -func (g *genericScheduler) getPreUsed(clusters []*clusterv1alpha1.Cluster, preUsedClustersName ...string) ([]*clusterv1alpha1.Cluster, []*clusterv1alpha1.Cluster) { - if len(preUsedClustersName) == 0 { - return clusters, nil - } - preUsedClusterSet := sets.NewString(preUsedClustersName...) - var preUsedCluster []*clusterv1alpha1.Cluster - var unUsedCluster []*clusterv1alpha1.Cluster - for i := range clusters { - if preUsedClusterSet.Has(clusters[i].Name) { - preUsedCluster = append(preUsedCluster, clusters[i]) - } else { - unUsedCluster = append(unUsedCluster, clusters[i]) - } - } - return preUsedCluster, unUsedCluster + return newTargetClusters, nil } func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *policyv1alpha1.Placement, @@ -606,13 +348,3 @@ func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *poli return ScheduleResult{reScheduleResult}, nil } - -// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'. -func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String { - return bindClusters.Difference(bindClusters.Difference(readyClusters)) -} - -// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'. -func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String { - return readyClusters.Difference(bindCluster) -} diff --git a/pkg/scheduler/core/util.go b/pkg/scheduler/core/util.go new file mode 100644 index 000000000..2a8802edd --- /dev/null +++ b/pkg/scheduler/core/util.go @@ -0,0 +1,123 @@ +package core + +import ( + "context" + "fmt" + "math" + "sort" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" + "github.com/karmada-io/karmada/pkg/scheduler/cache" + "github.com/karmada-io/karmada/pkg/util" +) + +func getDefaultWeightPreference(clusters []*clusterv1alpha1.Cluster) *policyv1alpha1.ClusterPreferences { + staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0) + for _, cluster := range clusters { + staticWeightList := policyv1alpha1.StaticClusterWeight{ + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{cluster.Name}, + }, + Weight: 1, + } + staticWeightLists = append(staticWeightLists, staticWeightList) + } + + return &policyv1alpha1.ClusterPreferences{ + StaticWeightList: staticWeightLists, + } +} + +func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster { + availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) + + // Set the boundary. + for i := range availableTargetClusters { + availableTargetClusters[i].Name = clusters[i].Name + availableTargetClusters[i].Replicas = math.MaxInt32 + } + + // Get the minimum value of MaxAvailableReplicas in terms of all estimators. + estimators := estimatorclient.GetReplicaEstimators() + ctx := context.WithValue(context.TODO(), util.ContextKeyObject, + fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name)) + for _, estimator := range estimators { + res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements) + if err != nil { + klog.Errorf("Max cluster available replicas error: %v", err) + continue + } + for i := range res { + if res[i].Replicas == estimatorclient.UnauthenticReplica { + continue + } + if availableTargetClusters[i].Name == res[i].Name && availableTargetClusters[i].Replicas > res[i].Replicas { + availableTargetClusters[i].Replicas = res[i].Replicas + } + } + } + + // In most cases, the target cluster max available replicas should not be MaxInt32 unless the workload is best-effort + // and the scheduler-estimator has not been enabled. So we set the replicas to spec.Replicas for avoiding overflow. + for i := range availableTargetClusters { + if availableTargetClusters[i].Replicas == math.MaxInt32 { + availableTargetClusters[i].Replicas = spec.Replicas + } + } + + sort.Sort(TargetClustersList(availableTargetClusters)) + klog.V(4).Infof("Target cluster: %v", availableTargetClusters) + return availableTargetClusters +} + +func getPreSelected(targetClusters []workv1alpha2.TargetCluster, schedulerCache cache.Cache) []*clusterv1alpha1.Cluster { + var preSelectedClusters []*clusterv1alpha1.Cluster + clusterInfoSnapshot := schedulerCache.Snapshot() + for _, targetCluster := range targetClusters { + for _, cluster := range clusterInfoSnapshot.GetClusters() { + if targetCluster.Name == cluster.Cluster().Name { + preSelectedClusters = append(preSelectedClusters, cluster.Cluster()) + break + } + } + } + return preSelectedClusters +} + +// presortClusterList is used to make sure preUsedClusterNames are in front of the other clusters in the list of +// clusterAvailableReplicas so that we can assign new replicas to them preferentially when scale up. +// Note that preUsedClusterNames have none items during first scheduler +func presortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, preUsedClusterNames ...string) []workv1alpha2.TargetCluster { + if len(preUsedClusterNames) == 0 { + return clusterAvailableReplicas + } + preUsedClusterSet := sets.NewString(preUsedClusterNames...) + var preUsedCluster []workv1alpha2.TargetCluster + var unUsedCluster []workv1alpha2.TargetCluster + for i := range clusterAvailableReplicas { + if preUsedClusterSet.Has(clusterAvailableReplicas[i].Name) { + preUsedCluster = append(preUsedCluster, clusterAvailableReplicas[i]) + } else { + unUsedCluster = append(unUsedCluster, clusterAvailableReplicas[i]) + } + } + clusterAvailableReplicas = append(preUsedCluster, unUsedCluster...) + klog.V(4).Infof("resorted target cluster: %v", clusterAvailableReplicas) + return clusterAvailableReplicas +} + +// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'. +func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String { + return bindClusters.Difference(bindClusters.Difference(readyClusters)) +} + +// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'. +func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String { + return readyClusters.Difference(bindCluster) +} diff --git a/pkg/util/binding.go b/pkg/util/binding.go index 9a6132bcf..143523e54 100644 --- a/pkg/util/binding.go +++ b/pkg/util/binding.go @@ -57,3 +57,24 @@ func ConvertToClusterNames(clusters []workv1alpha2.TargetCluster) sets.String { return clusterNames } + +// MergeTargetClusters will merge the replicas in two TargetCluster +func MergeTargetClusters(old, new []workv1alpha2.TargetCluster) []workv1alpha2.TargetCluster { + // oldMap is a map of the result for the old replicas so that it can be merged with the new result easily + oldMap := make(map[string]int32) + for _, cluster := range old { + oldMap[cluster.Name] = cluster.Replicas + } + // merge the new replicas and the data of old replicas + for i, cluster := range new { + value, ok := oldMap[cluster.Name] + if ok { + new[i].Replicas = cluster.Replicas + value + delete(oldMap, cluster.Name) + } + } + for key, value := range oldMap { + new = append(new, workv1alpha2.TargetCluster{Name: key, Replicas: value}) + } + return new +} diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index eac6dae43..36fa2a16b 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -80,6 +80,17 @@ func GetBindingClusterNames(targetClusters []workv1alpha2.TargetCluster) []strin return clusterNames } +// GetUsedBindingClusterNames will get used clusterName list from bind clusters field +func GetUsedBindingClusterNames(targetClusters []workv1alpha2.TargetCluster) []string { + var usedClusterNames []string + for _, targetCluster := range targetClusters { + if targetCluster.Replicas > 0 { + usedClusterNames = append(usedClusterNames, targetCluster.Name) + } + } + return usedClusterNames +} + // FindOrphanWorks retrieves all works that labeled with current binding(ResourceBinding or ClusterResourceBinding) objects, // then pick the works that not meet current binding declaration. func FindOrphanWorks(c client.Client, bindingNamespace, bindingName string, clusterNames []string, scope apiextensionsv1.ResourceScope) ([]workv1alpha1.Work, error) { diff --git a/test/helper/resource.go b/test/helper/resource.go index 828969e00..1f84837f5 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -12,6 +12,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" ) // These are different resource units. @@ -379,3 +381,24 @@ func MakeNodeWithTaints(node string, milliCPU, memory, pods, ephemeralStorage in }, } } + +// NewCluster will build a Cluster. +func NewCluster(name string) *clusterv1alpha1.Cluster { + return &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + } +} + +// NewClusterWithResource will build a Cluster with resource. +func NewClusterWithResource(name string, allocatable, allocating, allocated corev1.ResourceList) *clusterv1alpha1.Cluster { + return &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: clusterv1alpha1.ClusterStatus{ + ResourceSummary: &clusterv1alpha1.ResourceSummary{ + Allocatable: allocatable, + Allocating: allocating, + Allocated: allocated, + }, + }, + } +} diff --git a/test/helper/scheduler.go b/test/helper/scheduler.go new file mode 100644 index 000000000..8ce79ffa3 --- /dev/null +++ b/test/helper/scheduler.go @@ -0,0 +1,25 @@ +package helper + +import ( + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" +) + +// IsScheduleResultEqual will check whether two schedule results are equal. +func IsScheduleResultEqual(tc1, tc2 []workv1alpha2.TargetCluster) bool { + if len(tc1) != len(tc2) { + return false + } + for _, c1 := range tc1 { + found := false + for _, c2 := range tc2 { + if c1 == c2 { + found = true + break + } + } + if !found { + return false + } + } + return true +}