Merge pull request #1563 from huone1/optimize/selectClusters
[optimize]Consider available resources when selecting clusters
This commit is contained in:
commit
15e614ba56
|
@ -141,8 +141,7 @@ func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreLi
|
|||
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now())
|
||||
|
||||
groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec)
|
||||
|
||||
return spreadconstraint.SelectBestClusters(placement, groupClustersInfo)
|
||||
return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas)
|
||||
}
|
||||
|
||||
func (g *genericScheduler) assignReplicas(
|
||||
|
|
|
@ -117,6 +117,7 @@ func (info *GroupClustersInfo) generateClustersInfo(clustersScore framework.Clus
|
|||
clustersReplicas := calAvailableReplicas(clusters, rbSpec)
|
||||
for i, clustersReplica := range clustersReplicas {
|
||||
info.Clusters[i].AvailableReplicas = int64(clustersReplica.Replicas)
|
||||
info.Clusters[i].AvailableReplicas += int64(getScheduledReplicas(rbSpec, clustersReplica.Name))
|
||||
}
|
||||
|
||||
sortClusters(info.Clusters)
|
||||
|
@ -238,3 +239,19 @@ func sortClusters(infos []ClusterDetailInfo) {
|
|||
return infos[i].Name < infos[j].Name
|
||||
})
|
||||
}
|
||||
|
||||
func getScheduledReplicas(rbSpec *workv1alpha2.ResourceBindingSpec, clusterName string) int32 {
|
||||
if rbSpec == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
var replicas int32
|
||||
for _, cluster := range rbSpec.Clusters {
|
||||
if cluster.Name == clusterName {
|
||||
replicas = cluster.Replicas
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return replicas
|
||||
}
|
||||
|
|
|
@ -3,53 +3,64 @@ package spreadconstraint
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
)
|
||||
|
||||
// SelectBestClusters selects the cluster set based the GroupClustersInfo and placement
|
||||
func SelectBestClusters(placement *policyv1alpha1.Placement, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) {
|
||||
if len(placement.SpreadConstraints) != 0 {
|
||||
return selectBestClustersBySpreadConstraints(placement.SpreadConstraints, groupClustersInfo)
|
||||
}
|
||||
|
||||
func SelectBestClusters(placement *policyv1alpha1.Placement, groupClustersInfo *GroupClustersInfo, needReplicas int32) ([]*clusterv1alpha1.Cluster, error) {
|
||||
if len(placement.SpreadConstraints) == 0 || shouldIgnoreSpreadConstraint(placement) {
|
||||
var clusters []*clusterv1alpha1.Cluster
|
||||
for _, cluster := range groupClustersInfo.Clusters {
|
||||
clusters = append(clusters, cluster.Cluster)
|
||||
}
|
||||
|
||||
klog.V(4).Infof("select all clusters")
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
if shouldIgnoreAvailableResource(placement) {
|
||||
needReplicas = InvalidReplicas
|
||||
}
|
||||
|
||||
return selectBestClustersBySpreadConstraints(placement.SpreadConstraints, groupClustersInfo, needReplicas)
|
||||
}
|
||||
|
||||
func selectBestClustersBySpreadConstraints(spreadConstraints []policyv1alpha1.SpreadConstraint,
|
||||
groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) {
|
||||
groupClustersInfo *GroupClustersInfo, needReplicas int32) ([]*clusterv1alpha1.Cluster, error) {
|
||||
if len(spreadConstraints) > 1 {
|
||||
return nil, fmt.Errorf("just support single spread constraint")
|
||||
}
|
||||
|
||||
spreadConstraint := spreadConstraints[0]
|
||||
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster {
|
||||
return selectBestClustersByCluster(spreadConstraint, groupClustersInfo)
|
||||
return selectBestClustersByCluster(spreadConstraint, groupClustersInfo, needReplicas)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("just support cluster spread constraint")
|
||||
}
|
||||
|
||||
func selectBestClustersByCluster(spreadConstraint policyv1alpha1.SpreadConstraint, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) {
|
||||
totalClusterCnt := len(groupClustersInfo.Clusters)
|
||||
if spreadConstraint.MinGroups > totalClusterCnt {
|
||||
return nil, fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups")
|
||||
func shouldIgnoreSpreadConstraint(placement *policyv1alpha1.Placement) bool {
|
||||
strategy := placement.ReplicaScheduling
|
||||
|
||||
// If the replica division preference is 'static weighted', ignore the declaration specified by spread constraints.
|
||||
if strategy != nil && strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided &&
|
||||
strategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted &&
|
||||
(strategy.WeightPreference != nil && len(strategy.WeightPreference.StaticWeightList) != 0 && strategy.WeightPreference.DynamicWeight == "") {
|
||||
return true
|
||||
}
|
||||
|
||||
needCnt := spreadConstraint.MaxGroups
|
||||
if spreadConstraint.MaxGroups > totalClusterCnt {
|
||||
needCnt = totalClusterCnt
|
||||
}
|
||||
|
||||
var clusters []*clusterv1alpha1.Cluster
|
||||
for i := 0; i < needCnt; i++ {
|
||||
clusters = append(clusters, groupClustersInfo.Clusters[i].Cluster)
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
return false
|
||||
}
|
||||
|
||||
func shouldIgnoreAvailableResource(placement *policyv1alpha1.Placement) bool {
|
||||
strategy := placement.ReplicaScheduling
|
||||
|
||||
// If the replica division preference is 'Duplicated', ignore the information about cluster available resource.
|
||||
if strategy == nil || strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
package spreadconstraint
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
)
|
||||
|
||||
func selectBestClustersByCluster(spreadConstraint policyv1alpha1.SpreadConstraint, groupClustersInfo *GroupClustersInfo,
|
||||
needReplicas int32) ([]*clusterv1alpha1.Cluster, error) {
|
||||
totalClusterCnt := len(groupClustersInfo.Clusters)
|
||||
if totalClusterCnt < spreadConstraint.MinGroups {
|
||||
return nil, fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups")
|
||||
}
|
||||
|
||||
needCnt := spreadConstraint.MaxGroups
|
||||
if totalClusterCnt < spreadConstraint.MaxGroups {
|
||||
needCnt = totalClusterCnt
|
||||
}
|
||||
|
||||
var clusterInfos []ClusterDetailInfo
|
||||
|
||||
if needReplicas == InvalidReplicas {
|
||||
clusterInfos = groupClustersInfo.Clusters[:needCnt]
|
||||
} else {
|
||||
clusterInfos = selectClustersByAvailableResource(groupClustersInfo.Clusters, int32(needCnt), needReplicas)
|
||||
if len(clusterInfos) == 0 {
|
||||
return nil, fmt.Errorf("no enough resource when selecting %d clusters", needCnt)
|
||||
}
|
||||
}
|
||||
|
||||
var clusters []*clusterv1alpha1.Cluster
|
||||
for i := range clusterInfos {
|
||||
clusters = append(clusters, clusterInfos[i].Cluster)
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
// if needClusterCount = 2, needReplicas = 80, member1 and member3 will be selected finally.
|
||||
// because the total resource of member1 and member2 is less than needReplicas although their scores is highest
|
||||
// --------------------------------------------------
|
||||
// | clusterName | member1 | member2 | member3 |
|
||||
// |-------------------------------------------------
|
||||
// | score | 60 | 50 | 40 |
|
||||
// |------------------------------------------------|
|
||||
// |AvailableReplicas | 40 | 30 | 60 |
|
||||
// |------------------------------------------------|
|
||||
func selectClustersByAvailableResource(candidateClusters []ClusterDetailInfo, needClusterCount, needReplicas int32) []ClusterDetailInfo {
|
||||
retClusters := candidateClusters[:needClusterCount]
|
||||
restClusters := candidateClusters[needClusterCount:]
|
||||
|
||||
// the retClusters is sorted by cluster.Score descending. when the total AvailableReplicas of retClusters is less than needReplicas,
|
||||
// use the cluster with the most AvailableReplicas in restClusters to instead the cluster with the lowest score,
|
||||
// from the last cluster of the slice until checkAvailableResource returns true
|
||||
var updateClusterID = len(retClusters) - 1
|
||||
for !checkAvailableResource(retClusters, needReplicas) && updateClusterID >= 0 {
|
||||
clusterID := GetClusterWithMaxAvailableResource(restClusters, retClusters[updateClusterID].AvailableReplicas)
|
||||
if clusterID == InvalidClusterID {
|
||||
updateClusterID--
|
||||
continue
|
||||
}
|
||||
|
||||
retClusters[updateClusterID], restClusters[clusterID] = restClusters[clusterID], retClusters[updateClusterID]
|
||||
updateClusterID--
|
||||
}
|
||||
|
||||
if updateClusterID < 0 {
|
||||
return nil
|
||||
}
|
||||
return retClusters
|
||||
}
|
||||
|
||||
func checkAvailableResource(clusters []ClusterDetailInfo, needReplicas int32) bool {
|
||||
var total int64
|
||||
|
||||
for i := range clusters {
|
||||
total += clusters[i].AvailableReplicas
|
||||
}
|
||||
|
||||
return total >= int64(needReplicas)
|
||||
}
|
||||
|
||||
// GetClusterWithMaxAvailableResource returns the cluster with maxAvailableReplicas
|
||||
func GetClusterWithMaxAvailableResource(candidateClusters []ClusterDetailInfo, originReplicas int64) int {
|
||||
var maxAvailableReplicas = originReplicas
|
||||
var clusterID = InvalidClusterID
|
||||
for i := range candidateClusters {
|
||||
if maxAvailableReplicas < candidateClusters[i].AvailableReplicas {
|
||||
clusterID = i
|
||||
maxAvailableReplicas = candidateClusters[i].AvailableReplicas
|
||||
}
|
||||
}
|
||||
|
||||
return clusterID
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package spreadconstraint
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
|
@ -27,41 +28,42 @@ func generateClusterInfo() []ClusterDetailInfo {
|
|||
{
|
||||
Name: "member4",
|
||||
Score: 60,
|
||||
AvailableReplicas: 101,
|
||||
AvailableReplicas: 50,
|
||||
Cluster: NewClusterWithTopology("member4", "P2", "R2", "Z2"),
|
||||
},
|
||||
{
|
||||
Name: "member2",
|
||||
Score: 40,
|
||||
AvailableReplicas: 101,
|
||||
AvailableReplicas: 60,
|
||||
Cluster: NewClusterWithTopology("member2", "P1", "R1", "Z2"),
|
||||
},
|
||||
{
|
||||
Name: "member3",
|
||||
Score: 30,
|
||||
AvailableReplicas: 101,
|
||||
AvailableReplicas: 80,
|
||||
Cluster: NewClusterWithTopology("member3", "P2", "R1", "Z1"),
|
||||
},
|
||||
{
|
||||
Name: "member1",
|
||||
Score: 20,
|
||||
AvailableReplicas: 101,
|
||||
AvailableReplicas: 40,
|
||||
Cluster: NewClusterWithTopology("member1", "P1", "R1", "Z1"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelectBestClusters(t *testing.T) {
|
||||
clustetInfos := generateClusterInfo()
|
||||
clusterInfos := generateClusterInfo()
|
||||
type args struct {
|
||||
placement *policyv1alpha1.Placement
|
||||
groupClustersInfo *GroupClustersInfo
|
||||
needReplicas int32
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []*clusterv1alpha1.Cluster
|
||||
wantErr bool
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "select clusters by cluster score",
|
||||
|
@ -76,20 +78,153 @@ func TestSelectBestClusters(t *testing.T) {
|
|||
},
|
||||
},
|
||||
groupClustersInfo: &GroupClustersInfo{
|
||||
Clusters: clustetInfos,
|
||||
Clusters: clusterInfos,
|
||||
},
|
||||
needReplicas: 100,
|
||||
},
|
||||
want: []*clusterv1alpha1.Cluster{
|
||||
clustetInfos[0].Cluster,
|
||||
clustetInfos[1].Cluster,
|
||||
clusterInfos[0].Cluster,
|
||||
clusterInfos[1].Cluster,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "select clusters by cluster score and ignore available resources when scheduling strategy is duplicated",
|
||||
args: args{
|
||||
placement: &policyv1alpha1.Placement{
|
||||
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
|
||||
{
|
||||
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
|
||||
MaxGroups: 2,
|
||||
MinGroups: 1,
|
||||
},
|
||||
},
|
||||
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
|
||||
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDuplicated,
|
||||
},
|
||||
},
|
||||
groupClustersInfo: &GroupClustersInfo{
|
||||
Clusters: clusterInfos,
|
||||
},
|
||||
needReplicas: 120,
|
||||
},
|
||||
want: []*clusterv1alpha1.Cluster{
|
||||
clusterInfos[0].Cluster,
|
||||
clusterInfos[1].Cluster,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "select clusters by cluster score and ignore available resources when scheduling strategy is static weight",
|
||||
args: args{
|
||||
placement: &policyv1alpha1.Placement{
|
||||
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
|
||||
{
|
||||
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
|
||||
MaxGroups: 2,
|
||||
MinGroups: 1,
|
||||
},
|
||||
},
|
||||
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
|
||||
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
|
||||
ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted,
|
||||
WeightPreference: &policyv1alpha1.ClusterPreferences{
|
||||
StaticWeightList: []policyv1alpha1.StaticClusterWeight{
|
||||
{
|
||||
TargetCluster: policyv1alpha1.ClusterAffinity{
|
||||
ClusterNames: []string{"member1"},
|
||||
},
|
||||
Weight: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
groupClustersInfo: &GroupClustersInfo{
|
||||
Clusters: clusterInfos,
|
||||
},
|
||||
needReplicas: 120,
|
||||
},
|
||||
want: []*clusterv1alpha1.Cluster{
|
||||
clusterInfos[0].Cluster,
|
||||
clusterInfos[1].Cluster,
|
||||
clusterInfos[2].Cluster,
|
||||
clusterInfos[3].Cluster,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "select clusters by cluster score and satisfy available resources",
|
||||
args: args{
|
||||
placement: &policyv1alpha1.Placement{
|
||||
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
|
||||
{
|
||||
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
|
||||
MaxGroups: 2,
|
||||
MinGroups: 1,
|
||||
},
|
||||
},
|
||||
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
|
||||
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
|
||||
},
|
||||
},
|
||||
groupClustersInfo: &GroupClustersInfo{
|
||||
Clusters: clusterInfos,
|
||||
},
|
||||
needReplicas: 120,
|
||||
},
|
||||
want: []*clusterv1alpha1.Cluster{
|
||||
clusterInfos[0].Cluster,
|
||||
clusterInfos[2].Cluster,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "select clusters by cluster score and insufficient resources",
|
||||
args: args{
|
||||
placement: &policyv1alpha1.Placement{
|
||||
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
|
||||
{
|
||||
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
|
||||
MaxGroups: 2,
|
||||
MinGroups: 1,
|
||||
},
|
||||
},
|
||||
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
|
||||
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
|
||||
},
|
||||
},
|
||||
groupClustersInfo: &GroupClustersInfo{
|
||||
Clusters: clusterInfos,
|
||||
},
|
||||
needReplicas: 200,
|
||||
},
|
||||
want: nil,
|
||||
wantErr: fmt.Errorf("no enough resource when selecting %d clusters", 2),
|
||||
},
|
||||
{
|
||||
name: "select clusters by cluster score and exceeded the number of available clusters.",
|
||||
args: args{
|
||||
placement: &policyv1alpha1.Placement{
|
||||
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
|
||||
{
|
||||
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
|
||||
MaxGroups: 7,
|
||||
MinGroups: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
groupClustersInfo: &GroupClustersInfo{
|
||||
Clusters: clusterInfos,
|
||||
},
|
||||
needReplicas: 30,
|
||||
},
|
||||
want: nil,
|
||||
wantErr: fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := SelectBestClusters(tt.args.placement, tt.args.groupClustersInfo)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("SelectBestClusters() error = %v, wantErr %v", err, tt.wantErr)
|
||||
got, err := SelectBestClusters(tt.args.placement, tt.args.groupClustersInfo, tt.args.needReplicas)
|
||||
if err != nil && err.Error() != tt.wantErr.Error() {
|
||||
t.Errorf("SelectBestClusters() error = %v, wantErr = %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
|
|
|
@ -14,6 +14,13 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/util"
|
||||
)
|
||||
|
||||
const (
|
||||
// InvalidClusterID indicate a invalid cluster
|
||||
InvalidClusterID = -1
|
||||
// InvalidReplicas indicate that don't care about the available resource
|
||||
InvalidReplicas = -1
|
||||
)
|
||||
|
||||
func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster {
|
||||
availableClusters := make([]workv1alpha2.TargetCluster, len(clusters))
|
||||
|
||||
|
|
Loading…
Reference in New Issue