diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index fc83b232c..35da55ce7 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -141,8 +141,7 @@ func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreLi defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now()) groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec) - - return spreadconstraint.SelectBestClusters(placement, groupClustersInfo) + return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas) } func (g *genericScheduler) assignReplicas( diff --git a/pkg/scheduler/core/spreadconstraint/group_clusters.go b/pkg/scheduler/core/spreadconstraint/group_clusters.go index 5ded6a6af..d9c1db702 100644 --- a/pkg/scheduler/core/spreadconstraint/group_clusters.go +++ b/pkg/scheduler/core/spreadconstraint/group_clusters.go @@ -117,6 +117,7 @@ func (info *GroupClustersInfo) generateClustersInfo(clustersScore framework.Clus clustersReplicas := calAvailableReplicas(clusters, rbSpec) for i, clustersReplica := range clustersReplicas { info.Clusters[i].AvailableReplicas = int64(clustersReplica.Replicas) + info.Clusters[i].AvailableReplicas += int64(getScheduledReplicas(rbSpec, clustersReplica.Name)) } sortClusters(info.Clusters) @@ -238,3 +239,19 @@ func sortClusters(infos []ClusterDetailInfo) { return infos[i].Name < infos[j].Name }) } + +func getScheduledReplicas(rbSpec *workv1alpha2.ResourceBindingSpec, clusterName string) int32 { + if rbSpec == nil { + return 0 + } + + var replicas int32 + for _, cluster := range rbSpec.Clusters { + if cluster.Name == clusterName { + replicas = cluster.Replicas + break + } + } + + return replicas +} diff --git a/pkg/scheduler/core/spreadconstraint/select_clusters.go b/pkg/scheduler/core/spreadconstraint/select_clusters.go index 26e393a4c..cf8fb27ad 100644 --- a/pkg/scheduler/core/spreadconstraint/select_clusters.go +++ b/pkg/scheduler/core/spreadconstraint/select_clusters.go @@ -3,53 +3,64 @@ package spreadconstraint import ( "fmt" + "k8s.io/klog/v2" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" ) // SelectBestClusters selects the cluster set based the GroupClustersInfo and placement -func SelectBestClusters(placement *policyv1alpha1.Placement, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) { - if len(placement.SpreadConstraints) != 0 { - return selectBestClustersBySpreadConstraints(placement.SpreadConstraints, groupClustersInfo) +func SelectBestClusters(placement *policyv1alpha1.Placement, groupClustersInfo *GroupClustersInfo, needReplicas int32) ([]*clusterv1alpha1.Cluster, error) { + if len(placement.SpreadConstraints) == 0 || shouldIgnoreSpreadConstraint(placement) { + var clusters []*clusterv1alpha1.Cluster + for _, cluster := range groupClustersInfo.Clusters { + clusters = append(clusters, cluster.Cluster) + } + klog.V(4).Infof("select all clusters") + return clusters, nil } - var clusters []*clusterv1alpha1.Cluster - for _, cluster := range groupClustersInfo.Clusters { - clusters = append(clusters, cluster.Cluster) + if shouldIgnoreAvailableResource(placement) { + needReplicas = InvalidReplicas } - return clusters, nil + return selectBestClustersBySpreadConstraints(placement.SpreadConstraints, groupClustersInfo, needReplicas) } func selectBestClustersBySpreadConstraints(spreadConstraints []policyv1alpha1.SpreadConstraint, - groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) { + groupClustersInfo *GroupClustersInfo, needReplicas int32) ([]*clusterv1alpha1.Cluster, error) { if len(spreadConstraints) > 1 { return nil, fmt.Errorf("just support single spread constraint") } spreadConstraint := spreadConstraints[0] if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster { - return selectBestClustersByCluster(spreadConstraint, groupClustersInfo) + return selectBestClustersByCluster(spreadConstraint, groupClustersInfo, needReplicas) } return nil, fmt.Errorf("just support cluster spread constraint") } -func selectBestClustersByCluster(spreadConstraint policyv1alpha1.SpreadConstraint, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) { - totalClusterCnt := len(groupClustersInfo.Clusters) - if spreadConstraint.MinGroups > totalClusterCnt { - return nil, fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups") +func shouldIgnoreSpreadConstraint(placement *policyv1alpha1.Placement) bool { + strategy := placement.ReplicaScheduling + + // If the replica division preference is 'static weighted', ignore the declaration specified by spread constraints. + if strategy != nil && strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided && + strategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted && + (strategy.WeightPreference != nil && len(strategy.WeightPreference.StaticWeightList) != 0 && strategy.WeightPreference.DynamicWeight == "") { + return true } - needCnt := spreadConstraint.MaxGroups - if spreadConstraint.MaxGroups > totalClusterCnt { - needCnt = totalClusterCnt - } - - var clusters []*clusterv1alpha1.Cluster - for i := 0; i < needCnt; i++ { - clusters = append(clusters, groupClustersInfo.Clusters[i].Cluster) - } - - return clusters, nil + return false +} + +func shouldIgnoreAvailableResource(placement *policyv1alpha1.Placement) bool { + strategy := placement.ReplicaScheduling + + // If the replica division preference is 'Duplicated', ignore the information about cluster available resource. + if strategy == nil || strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + return true + } + + return false } diff --git a/pkg/scheduler/core/spreadconstraint/select_clusters_by_cluster.go b/pkg/scheduler/core/spreadconstraint/select_clusters_by_cluster.go new file mode 100644 index 000000000..bf5f1233d --- /dev/null +++ b/pkg/scheduler/core/spreadconstraint/select_clusters_by_cluster.go @@ -0,0 +1,97 @@ +package spreadconstraint + +import ( + "fmt" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" +) + +func selectBestClustersByCluster(spreadConstraint policyv1alpha1.SpreadConstraint, groupClustersInfo *GroupClustersInfo, + needReplicas int32) ([]*clusterv1alpha1.Cluster, error) { + totalClusterCnt := len(groupClustersInfo.Clusters) + if totalClusterCnt < spreadConstraint.MinGroups { + return nil, fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups") + } + + needCnt := spreadConstraint.MaxGroups + if totalClusterCnt < spreadConstraint.MaxGroups { + needCnt = totalClusterCnt + } + + var clusterInfos []ClusterDetailInfo + + if needReplicas == InvalidReplicas { + clusterInfos = groupClustersInfo.Clusters[:needCnt] + } else { + clusterInfos = selectClustersByAvailableResource(groupClustersInfo.Clusters, int32(needCnt), needReplicas) + if len(clusterInfos) == 0 { + return nil, fmt.Errorf("no enough resource when selecting %d clusters", needCnt) + } + } + + var clusters []*clusterv1alpha1.Cluster + for i := range clusterInfos { + clusters = append(clusters, clusterInfos[i].Cluster) + } + + return clusters, nil +} + +// if needClusterCount = 2, needReplicas = 80, member1 and member3 will be selected finally. +// because the total resource of member1 and member2 is less than needReplicas although their scores is highest +// -------------------------------------------------- +// | clusterName | member1 | member2 | member3 | +// |------------------------------------------------- +// | score | 60 | 50 | 40 | +// |------------------------------------------------| +// |AvailableReplicas | 40 | 30 | 60 | +// |------------------------------------------------| +func selectClustersByAvailableResource(candidateClusters []ClusterDetailInfo, needClusterCount, needReplicas int32) []ClusterDetailInfo { + retClusters := candidateClusters[:needClusterCount] + restClusters := candidateClusters[needClusterCount:] + + // the retClusters is sorted by cluster.Score descending. when the total AvailableReplicas of retClusters is less than needReplicas, + // use the cluster with the most AvailableReplicas in restClusters to instead the cluster with the lowest score, + // from the last cluster of the slice until checkAvailableResource returns true + var updateClusterID = len(retClusters) - 1 + for !checkAvailableResource(retClusters, needReplicas) && updateClusterID >= 0 { + clusterID := GetClusterWithMaxAvailableResource(restClusters, retClusters[updateClusterID].AvailableReplicas) + if clusterID == InvalidClusterID { + updateClusterID-- + continue + } + + retClusters[updateClusterID], restClusters[clusterID] = restClusters[clusterID], retClusters[updateClusterID] + updateClusterID-- + } + + if updateClusterID < 0 { + return nil + } + return retClusters +} + +func checkAvailableResource(clusters []ClusterDetailInfo, needReplicas int32) bool { + var total int64 + + for i := range clusters { + total += clusters[i].AvailableReplicas + } + + return total >= int64(needReplicas) +} + +// GetClusterWithMaxAvailableResource returns the cluster with maxAvailableReplicas +func GetClusterWithMaxAvailableResource(candidateClusters []ClusterDetailInfo, originReplicas int64) int { + var maxAvailableReplicas = originReplicas + var clusterID = InvalidClusterID + for i := range candidateClusters { + if maxAvailableReplicas < candidateClusters[i].AvailableReplicas { + clusterID = i + maxAvailableReplicas = candidateClusters[i].AvailableReplicas + } + } + + return clusterID +} diff --git a/pkg/scheduler/core/spreadconstraint/select_clusters_test.go b/pkg/scheduler/core/spreadconstraint/select_clusters_test.go index f501aec5f..c59b86e35 100644 --- a/pkg/scheduler/core/spreadconstraint/select_clusters_test.go +++ b/pkg/scheduler/core/spreadconstraint/select_clusters_test.go @@ -1,6 +1,7 @@ package spreadconstraint import ( + "fmt" "reflect" "testing" @@ -27,41 +28,42 @@ func generateClusterInfo() []ClusterDetailInfo { { Name: "member4", Score: 60, - AvailableReplicas: 101, + AvailableReplicas: 50, Cluster: NewClusterWithTopology("member4", "P2", "R2", "Z2"), }, { Name: "member2", Score: 40, - AvailableReplicas: 101, + AvailableReplicas: 60, Cluster: NewClusterWithTopology("member2", "P1", "R1", "Z2"), }, { Name: "member3", Score: 30, - AvailableReplicas: 101, + AvailableReplicas: 80, Cluster: NewClusterWithTopology("member3", "P2", "R1", "Z1"), }, { Name: "member1", Score: 20, - AvailableReplicas: 101, + AvailableReplicas: 40, Cluster: NewClusterWithTopology("member1", "P1", "R1", "Z1"), }, } } func TestSelectBestClusters(t *testing.T) { - clustetInfos := generateClusterInfo() + clusterInfos := generateClusterInfo() type args struct { placement *policyv1alpha1.Placement groupClustersInfo *GroupClustersInfo + needReplicas int32 } tests := []struct { name string args args want []*clusterv1alpha1.Cluster - wantErr bool + wantErr error }{ { name: "select clusters by cluster score", @@ -76,20 +78,153 @@ func TestSelectBestClusters(t *testing.T) { }, }, groupClustersInfo: &GroupClustersInfo{ - Clusters: clustetInfos, + Clusters: clusterInfos, }, + needReplicas: 100, }, want: []*clusterv1alpha1.Cluster{ - clustetInfos[0].Cluster, - clustetInfos[1].Cluster, + clusterInfos[0].Cluster, + clusterInfos[1].Cluster, }, }, + { + name: "select clusters by cluster score and ignore available resources when scheduling strategy is duplicated", + args: args{ + placement: &policyv1alpha1.Placement{ + SpreadConstraints: []policyv1alpha1.SpreadConstraint{ + { + SpreadByField: policyv1alpha1.SpreadByFieldCluster, + MaxGroups: 2, + MinGroups: 1, + }, + }, + ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ + ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDuplicated, + }, + }, + groupClustersInfo: &GroupClustersInfo{ + Clusters: clusterInfos, + }, + needReplicas: 120, + }, + want: []*clusterv1alpha1.Cluster{ + clusterInfos[0].Cluster, + clusterInfos[1].Cluster, + }, + }, + { + name: "select clusters by cluster score and ignore available resources when scheduling strategy is static weight", + args: args{ + placement: &policyv1alpha1.Placement{ + SpreadConstraints: []policyv1alpha1.SpreadConstraint{ + { + SpreadByField: policyv1alpha1.SpreadByFieldCluster, + MaxGroups: 2, + MinGroups: 1, + }, + }, + ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ + ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, + ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + WeightPreference: &policyv1alpha1.ClusterPreferences{ + StaticWeightList: []policyv1alpha1.StaticClusterWeight{ + { + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: []string{"member1"}, + }, + Weight: 2, + }, + }, + }, + }, + }, + groupClustersInfo: &GroupClustersInfo{ + Clusters: clusterInfos, + }, + needReplicas: 120, + }, + want: []*clusterv1alpha1.Cluster{ + clusterInfos[0].Cluster, + clusterInfos[1].Cluster, + clusterInfos[2].Cluster, + clusterInfos[3].Cluster, + }, + }, + { + name: "select clusters by cluster score and satisfy available resources", + args: args{ + placement: &policyv1alpha1.Placement{ + SpreadConstraints: []policyv1alpha1.SpreadConstraint{ + { + SpreadByField: policyv1alpha1.SpreadByFieldCluster, + MaxGroups: 2, + MinGroups: 1, + }, + }, + ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ + ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, + }, + }, + groupClustersInfo: &GroupClustersInfo{ + Clusters: clusterInfos, + }, + needReplicas: 120, + }, + want: []*clusterv1alpha1.Cluster{ + clusterInfos[0].Cluster, + clusterInfos[2].Cluster, + }, + }, + { + name: "select clusters by cluster score and insufficient resources", + args: args{ + placement: &policyv1alpha1.Placement{ + SpreadConstraints: []policyv1alpha1.SpreadConstraint{ + { + SpreadByField: policyv1alpha1.SpreadByFieldCluster, + MaxGroups: 2, + MinGroups: 1, + }, + }, + ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ + ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, + }, + }, + groupClustersInfo: &GroupClustersInfo{ + Clusters: clusterInfos, + }, + needReplicas: 200, + }, + want: nil, + wantErr: fmt.Errorf("no enough resource when selecting %d clusters", 2), + }, + { + name: "select clusters by cluster score and exceeded the number of available clusters.", + args: args{ + placement: &policyv1alpha1.Placement{ + SpreadConstraints: []policyv1alpha1.SpreadConstraint{ + { + SpreadByField: policyv1alpha1.SpreadByFieldCluster, + MaxGroups: 7, + MinGroups: 7, + }, + }, + }, + groupClustersInfo: &GroupClustersInfo{ + Clusters: clusterInfos, + }, + needReplicas: 30, + }, + want: nil, + wantErr: fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups"), + }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := SelectBestClusters(tt.args.placement, tt.args.groupClustersInfo) - if (err != nil) != tt.wantErr { - t.Errorf("SelectBestClusters() error = %v, wantErr %v", err, tt.wantErr) + got, err := SelectBestClusters(tt.args.placement, tt.args.groupClustersInfo, tt.args.needReplicas) + if err != nil && err.Error() != tt.wantErr.Error() { + t.Errorf("SelectBestClusters() error = %v, wantErr = %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { diff --git a/pkg/scheduler/core/spreadconstraint/util.go b/pkg/scheduler/core/spreadconstraint/util.go index 3533ed52c..ae1358c7f 100644 --- a/pkg/scheduler/core/spreadconstraint/util.go +++ b/pkg/scheduler/core/spreadconstraint/util.go @@ -14,6 +14,13 @@ import ( "github.com/karmada-io/karmada/pkg/util" ) +const ( + // InvalidClusterID indicate a invalid cluster + InvalidClusterID = -1 + // InvalidReplicas indicate that don't care about the available resource + InvalidReplicas = -1 +) + func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster { availableClusters := make([]workv1alpha2.TargetCluster, len(clusters))