From c69d0e0eda6c95c74384ba585d070209f5ed1b93 Mon Sep 17 00:00:00 2001 From: huone1 Date: Wed, 23 Mar 2022 14:28:16 +0800 Subject: [PATCH] refactor the selectclusters process Signed-off-by: huone1 --- pkg/scheduler/core/generic_scheduler.go | 79 +----- .../core/spreadconstraint/group_clusters.go | 243 ++++++++++++++++++ .../core/spreadconstraint/select_clusters.go | 55 ++++ pkg/scheduler/core/spreadconstraint/util.go | 67 +++++ pkg/util/spreadstate.go | 30 --- 5 files changed, 375 insertions(+), 99 deletions(-) create mode 100644 pkg/scheduler/core/spreadconstraint/group_clusters.go create mode 100644 pkg/scheduler/core/spreadconstraint/select_clusters.go create mode 100644 pkg/scheduler/core/spreadconstraint/util.go delete mode 100644 pkg/util/spreadstate.go diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 2dc1f5f4c..62c0abf72 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -11,10 +11,10 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/scheduler/cache" + "github.com/karmada-io/karmada/pkg/scheduler/core/spreadconstraint" "github.com/karmada-io/karmada/pkg/scheduler/framework" "github.com/karmada-io/karmada/pkg/scheduler/framework/runtime" "github.com/karmada-io/karmada/pkg/scheduler/metrics" - "github.com/karmada-io/karmada/pkg/util" ) // ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters. @@ -64,7 +64,11 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph } klog.V(4).Infof("feasible clusters scores: %v", clustersScore) - clusters := g.selectClusters(clustersScore, placement.SpreadConstraints, feasibleClusters) + clusters, err := g.selectClusters(clustersScore, placement, spec) + if err != nil { + return result, fmt.Errorf("failed to select clusters: %v", err) + } + klog.V(4).Infof("selected clusters: %v", clusters) clustersWithReplicas, err := g.assignReplicas(clusters, placement.ReplicaScheduling, spec) if err != nil { @@ -122,76 +126,13 @@ func (g *genericScheduler) prioritizeClusters( return result, nil } -func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, spreadConstraints []policyv1alpha1.SpreadConstraint, clusters []*clusterv1alpha1.Cluster) []*clusterv1alpha1.Cluster { +func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, + placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) { defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now()) - if len(spreadConstraints) != 0 { - return g.matchSpreadConstraints(clusters, spreadConstraints) - } + groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec) - return clusters -} - -func (g *genericScheduler) matchSpreadConstraints(clusters []*clusterv1alpha1.Cluster, spreadConstraints []policyv1alpha1.SpreadConstraint) []*clusterv1alpha1.Cluster { - state := util.NewSpreadGroup() - g.runSpreadConstraintsFilter(clusters, spreadConstraints, state) - return g.calSpreadResult(state) -} - -// Now support spread by cluster. More rules will be implemented later. -func (g *genericScheduler) runSpreadConstraintsFilter(clusters []*clusterv1alpha1.Cluster, spreadConstraints []policyv1alpha1.SpreadConstraint, spreadGroup *util.SpreadGroup) { - for _, spreadConstraint := range spreadConstraints { - spreadGroup.InitialGroupRecord(spreadConstraint) - if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster { - g.groupByFieldCluster(clusters, spreadConstraint, spreadGroup) - } - } -} - -func (g *genericScheduler) groupByFieldCluster(clusters []*clusterv1alpha1.Cluster, spreadConstraint policyv1alpha1.SpreadConstraint, spreadGroup *util.SpreadGroup) { - for _, cluster := range clusters { - clusterGroup := cluster.Name - spreadGroup.GroupRecord[spreadConstraint][clusterGroup] = append(spreadGroup.GroupRecord[spreadConstraint][clusterGroup], cluster) - } -} - -func (g *genericScheduler) calSpreadResult(spreadGroup *util.SpreadGroup) []*clusterv1alpha1.Cluster { - // TODO: now support single spread constraint - if len(spreadGroup.GroupRecord) > 1 { - return nil - } - - return g.chooseSpreadGroup(spreadGroup) -} - -func (g *genericScheduler) chooseSpreadGroup(spreadGroup *util.SpreadGroup) []*clusterv1alpha1.Cluster { - var feasibleClusters []*clusterv1alpha1.Cluster - for spreadConstraint, clusterGroups := range spreadGroup.GroupRecord { - if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster { - if len(clusterGroups) < spreadConstraint.MinGroups { - return nil - } - - if len(clusterGroups) <= spreadConstraint.MaxGroups { - for _, v := range clusterGroups { - feasibleClusters = append(feasibleClusters, v...) - } - break - } - - if spreadConstraint.MaxGroups > 0 && len(clusterGroups) > spreadConstraint.MaxGroups { - var groups []string - for group := range clusterGroups { - groups = append(groups, group) - } - - for i := 0; i < spreadConstraint.MaxGroups; i++ { - feasibleClusters = append(feasibleClusters, clusterGroups[groups[i]]...) - } - } - } - } - return feasibleClusters + return spreadconstraint.SelectBestClusters(placement, groupClustersInfo) } func (g *genericScheduler) assignReplicas( diff --git a/pkg/scheduler/core/spreadconstraint/group_clusters.go b/pkg/scheduler/core/spreadconstraint/group_clusters.go new file mode 100644 index 000000000..8236d13f3 --- /dev/null +++ b/pkg/scheduler/core/spreadconstraint/group_clusters.go @@ -0,0 +1,243 @@ +package spreadconstraint + +import ( + "sort" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/scheduler/framework" +) + +// GroupClustersInfo indicate the cluster global view +type GroupClustersInfo struct { + Providers map[string]ProviderInfo + Regions map[string]RegionInfo + Zones map[string]ZoneInfo + + // Clusters from globally view, sorted by cluster.Score descending. + Clusters []ClusterDetailInfo +} + +// ProviderInfo indicate the provider information +type ProviderInfo struct { + Name string + Score int64 + AvailableReplicas int64 + + Regions map[string]struct{} + Zones map[string]struct{} + // Clusters under this provider, sorted by cluster.Score descending. + Clusters []ClusterDetailInfo +} + +// RegionInfo indicate the region information +type RegionInfo struct { + Name string + Score int64 + AvailableReplicas int64 + + Zones map[string]struct{} + // Clusters under this region, sorted by cluster.Score descending. + Clusters []ClusterDetailInfo +} + +// ZoneInfo indicate the zone information +type ZoneInfo struct { + Name string + Score int64 + AvailableReplicas int64 + + // Clusters under this zone, sorted by cluster.Score descending. + Clusters []ClusterDetailInfo +} + +// ClusterDetailInfo indicate the cluster information +type ClusterDetailInfo struct { + Name string + Score int64 + AvailableReplicas int64 + + Cluster *clusterv1alpha1.Cluster +} + +// GroupClustersWithScore groups cluster base provider/region/zone/cluster +func GroupClustersWithScore( + clustersScore framework.ClusterScoreList, + placement *policyv1alpha1.Placement, + spec *workv1alpha2.ResourceBindingSpec, +) *GroupClustersInfo { + if isTopologyIgnored(placement) { + return groupClustersIngoreTopology(clustersScore, spec) + } + + return groupClustersBasedTopology(clustersScore, spec, placement.SpreadConstraints) +} + +func groupClustersBasedTopology( + clustersScore framework.ClusterScoreList, + rbSpec *workv1alpha2.ResourceBindingSpec, + spreadConstraints []policyv1alpha1.SpreadConstraint, +) *GroupClustersInfo { + groupClustersInfo := &GroupClustersInfo{ + Providers: make(map[string]ProviderInfo), + Regions: make(map[string]RegionInfo), + Zones: make(map[string]ZoneInfo), + } + + groupClustersInfo.generateClustersInfo(clustersScore, rbSpec) + groupClustersInfo.generateZoneInfo(spreadConstraints) + groupClustersInfo.generateRegionInfo(spreadConstraints) + groupClustersInfo.generateProviderInfo(spreadConstraints) + + return groupClustersInfo +} + +func groupClustersIngoreTopology( + clustersScore framework.ClusterScoreList, + rbSpec *workv1alpha2.ResourceBindingSpec, +) *GroupClustersInfo { + groupClustersInfo := &GroupClustersInfo{} + groupClustersInfo.generateClustersInfo(clustersScore, rbSpec) + + return groupClustersInfo +} + +func (info *GroupClustersInfo) generateClustersInfo(clustersScore framework.ClusterScoreList, rbSpec *workv1alpha2.ResourceBindingSpec) { + var clusters []*clusterv1alpha1.Cluster + for _, clusterScore := range clustersScore { + clusterInfo := ClusterDetailInfo{} + clusterInfo.Name = clusterScore.Cluster.Name + clusterInfo.Score = clusterScore.Score + clusterInfo.Cluster = clusterScore.Cluster + info.Clusters = append(info.Clusters, clusterInfo) + clusters = append(clusters, clusterScore.Cluster) + } + + clustersReplicas := calAvailableReplicas(clusters, rbSpec) + for i, clustersReplica := range clustersReplicas { + info.Clusters[i].AvailableReplicas = int64(clustersReplica.Replicas) + } + + sortClusters(info.Clusters) +} + +func (info *GroupClustersInfo) generateZoneInfo(spreadConstraints []policyv1alpha1.SpreadConstraint) { + if !IsSpreadConstraintExisted(spreadConstraints, policyv1alpha1.SpreadByFieldZone) { + return + } + + for _, clusterInfo := range info.Clusters { + zone := clusterInfo.Cluster.Spec.Zone + if zone == "" { + continue + } + + zoneInfo, ok := info.Zones[zone] + if !ok { + zoneInfo = ZoneInfo{ + Name: zone, + Clusters: make([]ClusterDetailInfo, 0), + } + } + + zoneInfo.Clusters = append(zoneInfo.Clusters, clusterInfo) + zoneInfo.Score += clusterInfo.Score + zoneInfo.AvailableReplicas += clusterInfo.AvailableReplicas + info.Zones[zone] = zoneInfo + } +} + +func (info *GroupClustersInfo) generateRegionInfo(spreadConstraints []policyv1alpha1.SpreadConstraint) { + if !IsSpreadConstraintExisted(spreadConstraints, policyv1alpha1.SpreadByFieldRegion) { + return + } + + for _, clusterInfo := range info.Clusters { + region := clusterInfo.Cluster.Spec.Region + if region == "" { + continue + } + + regionInfo, ok := info.Regions[region] + if !ok { + regionInfo = RegionInfo{ + Name: region, + Zones: make(map[string]struct{}), + Clusters: make([]ClusterDetailInfo, 0), + } + } + + if clusterInfo.Cluster.Spec.Zone != "" { + regionInfo.Zones[clusterInfo.Cluster.Spec.Zone] = struct{}{} + } + regionInfo.Clusters = append(regionInfo.Clusters, clusterInfo) + regionInfo.Score += clusterInfo.Score + regionInfo.AvailableReplicas += clusterInfo.AvailableReplicas + info.Regions[region] = regionInfo + } +} + +func (info *GroupClustersInfo) generateProviderInfo(spreadConstraints []policyv1alpha1.SpreadConstraint) { + if !IsSpreadConstraintExisted(spreadConstraints, policyv1alpha1.SpreadByFieldProvider) { + return + } + + for _, clusterInfo := range info.Clusters { + provider := clusterInfo.Cluster.Spec.Provider + if provider == "" { + continue + } + + providerInfo, ok := info.Providers[provider] + if !ok { + providerInfo = ProviderInfo{ + Name: provider, + Regions: make(map[string]struct{}), + Zones: make(map[string]struct{}), + Clusters: make([]ClusterDetailInfo, 0), + } + } + + if clusterInfo.Cluster.Spec.Zone != "" { + providerInfo.Zones[clusterInfo.Cluster.Spec.Zone] = struct{}{} + } + + if clusterInfo.Cluster.Spec.Region != "" { + providerInfo.Regions[clusterInfo.Cluster.Spec.Region] = struct{}{} + } + + providerInfo.Clusters = append(providerInfo.Clusters, clusterInfo) + providerInfo.Score += clusterInfo.Score + providerInfo.AvailableReplicas += clusterInfo.AvailableReplicas + info.Providers[provider] = providerInfo + } +} + +func isTopologyIgnored(placement *policyv1alpha1.Placement) bool { + strategy := placement.ReplicaScheduling + spreadConstraints := placement.SpreadConstraints + + if len(spreadConstraints) == 0 || (len(spreadConstraints) == 1 && spreadConstraints[0].SpreadByField == policyv1alpha1.SpreadByFieldCluster) { + return true + } + + // If the replica division preference is 'static weighted', ignore the declaration specified by spread constraints. + if strategy != nil && strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided && + strategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted && + (strategy.WeightPreference == nil || strategy.WeightPreference.DynamicWeight == "") { + return true + } + + return false +} + +func sortClusters(infos []ClusterDetailInfo) { + sort.Slice(infos, func(i, j int) bool { + if infos[i].Score != infos[j].Score { + return infos[i].Score > infos[j].Score + } + + return infos[i].Name < infos[j].Name + }) +} diff --git a/pkg/scheduler/core/spreadconstraint/select_clusters.go b/pkg/scheduler/core/spreadconstraint/select_clusters.go new file mode 100644 index 000000000..26e393a4c --- /dev/null +++ b/pkg/scheduler/core/spreadconstraint/select_clusters.go @@ -0,0 +1,55 @@ +package spreadconstraint + +import ( + "fmt" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" +) + +// SelectBestClusters selects the cluster set based the GroupClustersInfo and placement +func SelectBestClusters(placement *policyv1alpha1.Placement, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) { + if len(placement.SpreadConstraints) != 0 { + return selectBestClustersBySpreadConstraints(placement.SpreadConstraints, groupClustersInfo) + } + + var clusters []*clusterv1alpha1.Cluster + for _, cluster := range groupClustersInfo.Clusters { + clusters = append(clusters, cluster.Cluster) + } + + return clusters, nil +} + +func selectBestClustersBySpreadConstraints(spreadConstraints []policyv1alpha1.SpreadConstraint, + groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) { + if len(spreadConstraints) > 1 { + return nil, fmt.Errorf("just support single spread constraint") + } + + spreadConstraint := spreadConstraints[0] + if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster { + return selectBestClustersByCluster(spreadConstraint, groupClustersInfo) + } + + return nil, fmt.Errorf("just support cluster spread constraint") +} + +func selectBestClustersByCluster(spreadConstraint policyv1alpha1.SpreadConstraint, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) { + totalClusterCnt := len(groupClustersInfo.Clusters) + if spreadConstraint.MinGroups > totalClusterCnt { + return nil, fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups") + } + + needCnt := spreadConstraint.MaxGroups + if spreadConstraint.MaxGroups > totalClusterCnt { + needCnt = totalClusterCnt + } + + var clusters []*clusterv1alpha1.Cluster + for i := 0; i < needCnt; i++ { + clusters = append(clusters, groupClustersInfo.Clusters[i].Cluster) + } + + return clusters, nil +} diff --git a/pkg/scheduler/core/spreadconstraint/util.go b/pkg/scheduler/core/spreadconstraint/util.go new file mode 100644 index 000000000..538fba5bd --- /dev/null +++ b/pkg/scheduler/core/spreadconstraint/util.go @@ -0,0 +1,67 @@ +package spreadconstraint + +import ( + "context" + "fmt" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "math" + + "k8s.io/klog/v2" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" + "github.com/karmada-io/karmada/pkg/util" +) + +func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster { + availableClusters := make([]workv1alpha2.TargetCluster, len(clusters)) + + // Set the boundary. + for i := range availableClusters { + availableClusters[i].Name = clusters[i].Name + availableClusters[i].Replicas = math.MaxInt32 + } + + // Get the minimum value of MaxAvailableReplicas in terms of all estimators. + estimators := estimatorclient.GetReplicaEstimators() + ctx := context.WithValue(context.TODO(), util.ContextKeyObject, + fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name)) + for _, estimator := range estimators { + res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements) + if err != nil { + klog.Errorf("Max cluster available replicas error: %v", err) + continue + } + for i := range res { + if res[i].Replicas == estimatorclient.UnauthenticReplica { + continue + } + if availableClusters[i].Name == res[i].Name && availableClusters[i].Replicas > res[i].Replicas { + availableClusters[i].Replicas = res[i].Replicas + } + } + } + + // In most cases, the target cluster max available replicas should not be MaxInt32 unless the workload is best-effort + // and the scheduler-estimator has not been enabled. So we set the replicas to spec.Replicas for avoiding overflow. + for i := range availableClusters { + if availableClusters[i].Replicas == math.MaxInt32 { + availableClusters[i].Replicas = spec.Replicas + } + } + + klog.V(4).Infof("cluster replicas info: %v", availableClusters) + return availableClusters +} + +// IsSpreadConstraintExisted judge if the specific field is existed in the spread constraints +func IsSpreadConstraintExisted(spreadConstraints []policyv1alpha1.SpreadConstraint, field policyv1alpha1.SpreadFieldValue) bool { + for _, spreadConstraint := range spreadConstraints { + if spreadConstraint.SpreadByField == field { + return true + } + } + + return false +} diff --git a/pkg/util/spreadstate.go b/pkg/util/spreadstate.go deleted file mode 100644 index 40422f437..000000000 --- a/pkg/util/spreadstate.go +++ /dev/null @@ -1,30 +0,0 @@ -package util - -import ( - "sync" - - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" - policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" -) - -// SpreadGroup stores the cluster group info for given spread constraints -type SpreadGroup struct { - // The outer map's keys are SpreadConstraint. The values (inner map) of the outer map are maps with string - // keys and []string values. The inner map's key should specify the cluster group name. - GroupRecord map[policyv1alpha1.SpreadConstraint]map[string][]*clusterv1alpha1.Cluster - sync.RWMutex -} - -// NewSpreadGroup initializes a SpreadGroup -func NewSpreadGroup() *SpreadGroup { - return &SpreadGroup{ - GroupRecord: make(map[policyv1alpha1.SpreadConstraint]map[string][]*clusterv1alpha1.Cluster), - } -} - -// InitialGroupRecord initials a spread state record -func (ss *SpreadGroup) InitialGroupRecord(constraint policyv1alpha1.SpreadConstraint) { - ss.Lock() - defer ss.Unlock() - ss.GroupRecord[constraint] = make(map[string][]*clusterv1alpha1.Cluster) -}