refactor the selectclusters process
Signed-off-by: huone1 <huwanxing@huawei.com>
This commit is contained in:
parent
acb9ff9ee7
commit
c69d0e0eda
|
@ -11,10 +11,10 @@ import (
|
||||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/cache"
|
"github.com/karmada-io/karmada/pkg/scheduler/cache"
|
||||||
|
"github.com/karmada-io/karmada/pkg/scheduler/core/spreadconstraint"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/framework"
|
"github.com/karmada-io/karmada/pkg/scheduler/framework"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
|
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||||
"github.com/karmada-io/karmada/pkg/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
|
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
|
||||||
|
@ -64,7 +64,11 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("feasible clusters scores: %v", clustersScore)
|
klog.V(4).Infof("feasible clusters scores: %v", clustersScore)
|
||||||
|
|
||||||
clusters := g.selectClusters(clustersScore, placement.SpreadConstraints, feasibleClusters)
|
clusters, err := g.selectClusters(clustersScore, placement, spec)
|
||||||
|
if err != nil {
|
||||||
|
return result, fmt.Errorf("failed to select clusters: %v", err)
|
||||||
|
}
|
||||||
|
klog.V(4).Infof("selected clusters: %v", clusters)
|
||||||
|
|
||||||
clustersWithReplicas, err := g.assignReplicas(clusters, placement.ReplicaScheduling, spec)
|
clustersWithReplicas, err := g.assignReplicas(clusters, placement.ReplicaScheduling, spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -122,76 +126,13 @@ func (g *genericScheduler) prioritizeClusters(
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, spreadConstraints []policyv1alpha1.SpreadConstraint, clusters []*clusterv1alpha1.Cluster) []*clusterv1alpha1.Cluster {
|
func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList,
|
||||||
|
placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) {
|
||||||
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now())
|
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now())
|
||||||
|
|
||||||
if len(spreadConstraints) != 0 {
|
groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec)
|
||||||
return g.matchSpreadConstraints(clusters, spreadConstraints)
|
|
||||||
}
|
|
||||||
|
|
||||||
return clusters
|
return spreadconstraint.SelectBestClusters(placement, groupClustersInfo)
|
||||||
}
|
|
||||||
|
|
||||||
func (g *genericScheduler) matchSpreadConstraints(clusters []*clusterv1alpha1.Cluster, spreadConstraints []policyv1alpha1.SpreadConstraint) []*clusterv1alpha1.Cluster {
|
|
||||||
state := util.NewSpreadGroup()
|
|
||||||
g.runSpreadConstraintsFilter(clusters, spreadConstraints, state)
|
|
||||||
return g.calSpreadResult(state)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now support spread by cluster. More rules will be implemented later.
|
|
||||||
func (g *genericScheduler) runSpreadConstraintsFilter(clusters []*clusterv1alpha1.Cluster, spreadConstraints []policyv1alpha1.SpreadConstraint, spreadGroup *util.SpreadGroup) {
|
|
||||||
for _, spreadConstraint := range spreadConstraints {
|
|
||||||
spreadGroup.InitialGroupRecord(spreadConstraint)
|
|
||||||
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster {
|
|
||||||
g.groupByFieldCluster(clusters, spreadConstraint, spreadGroup)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *genericScheduler) groupByFieldCluster(clusters []*clusterv1alpha1.Cluster, spreadConstraint policyv1alpha1.SpreadConstraint, spreadGroup *util.SpreadGroup) {
|
|
||||||
for _, cluster := range clusters {
|
|
||||||
clusterGroup := cluster.Name
|
|
||||||
spreadGroup.GroupRecord[spreadConstraint][clusterGroup] = append(spreadGroup.GroupRecord[spreadConstraint][clusterGroup], cluster)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *genericScheduler) calSpreadResult(spreadGroup *util.SpreadGroup) []*clusterv1alpha1.Cluster {
|
|
||||||
// TODO: now support single spread constraint
|
|
||||||
if len(spreadGroup.GroupRecord) > 1 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return g.chooseSpreadGroup(spreadGroup)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *genericScheduler) chooseSpreadGroup(spreadGroup *util.SpreadGroup) []*clusterv1alpha1.Cluster {
|
|
||||||
var feasibleClusters []*clusterv1alpha1.Cluster
|
|
||||||
for spreadConstraint, clusterGroups := range spreadGroup.GroupRecord {
|
|
||||||
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster {
|
|
||||||
if len(clusterGroups) < spreadConstraint.MinGroups {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(clusterGroups) <= spreadConstraint.MaxGroups {
|
|
||||||
for _, v := range clusterGroups {
|
|
||||||
feasibleClusters = append(feasibleClusters, v...)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if spreadConstraint.MaxGroups > 0 && len(clusterGroups) > spreadConstraint.MaxGroups {
|
|
||||||
var groups []string
|
|
||||||
for group := range clusterGroups {
|
|
||||||
groups = append(groups, group)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < spreadConstraint.MaxGroups; i++ {
|
|
||||||
feasibleClusters = append(feasibleClusters, clusterGroups[groups[i]]...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return feasibleClusters
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *genericScheduler) assignReplicas(
|
func (g *genericScheduler) assignReplicas(
|
||||||
|
|
|
@ -0,0 +1,243 @@
|
||||||
|
package spreadconstraint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||||
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||||
|
"github.com/karmada-io/karmada/pkg/scheduler/framework"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GroupClustersInfo indicate the cluster global view
|
||||||
|
type GroupClustersInfo struct {
|
||||||
|
Providers map[string]ProviderInfo
|
||||||
|
Regions map[string]RegionInfo
|
||||||
|
Zones map[string]ZoneInfo
|
||||||
|
|
||||||
|
// Clusters from globally view, sorted by cluster.Score descending.
|
||||||
|
Clusters []ClusterDetailInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProviderInfo indicate the provider information
|
||||||
|
type ProviderInfo struct {
|
||||||
|
Name string
|
||||||
|
Score int64
|
||||||
|
AvailableReplicas int64
|
||||||
|
|
||||||
|
Regions map[string]struct{}
|
||||||
|
Zones map[string]struct{}
|
||||||
|
// Clusters under this provider, sorted by cluster.Score descending.
|
||||||
|
Clusters []ClusterDetailInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegionInfo indicate the region information
|
||||||
|
type RegionInfo struct {
|
||||||
|
Name string
|
||||||
|
Score int64
|
||||||
|
AvailableReplicas int64
|
||||||
|
|
||||||
|
Zones map[string]struct{}
|
||||||
|
// Clusters under this region, sorted by cluster.Score descending.
|
||||||
|
Clusters []ClusterDetailInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// ZoneInfo indicate the zone information
|
||||||
|
type ZoneInfo struct {
|
||||||
|
Name string
|
||||||
|
Score int64
|
||||||
|
AvailableReplicas int64
|
||||||
|
|
||||||
|
// Clusters under this zone, sorted by cluster.Score descending.
|
||||||
|
Clusters []ClusterDetailInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClusterDetailInfo indicate the cluster information
|
||||||
|
type ClusterDetailInfo struct {
|
||||||
|
Name string
|
||||||
|
Score int64
|
||||||
|
AvailableReplicas int64
|
||||||
|
|
||||||
|
Cluster *clusterv1alpha1.Cluster
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupClustersWithScore groups cluster base provider/region/zone/cluster
|
||||||
|
func GroupClustersWithScore(
|
||||||
|
clustersScore framework.ClusterScoreList,
|
||||||
|
placement *policyv1alpha1.Placement,
|
||||||
|
spec *workv1alpha2.ResourceBindingSpec,
|
||||||
|
) *GroupClustersInfo {
|
||||||
|
if isTopologyIgnored(placement) {
|
||||||
|
return groupClustersIngoreTopology(clustersScore, spec)
|
||||||
|
}
|
||||||
|
|
||||||
|
return groupClustersBasedTopology(clustersScore, spec, placement.SpreadConstraints)
|
||||||
|
}
|
||||||
|
|
||||||
|
func groupClustersBasedTopology(
|
||||||
|
clustersScore framework.ClusterScoreList,
|
||||||
|
rbSpec *workv1alpha2.ResourceBindingSpec,
|
||||||
|
spreadConstraints []policyv1alpha1.SpreadConstraint,
|
||||||
|
) *GroupClustersInfo {
|
||||||
|
groupClustersInfo := &GroupClustersInfo{
|
||||||
|
Providers: make(map[string]ProviderInfo),
|
||||||
|
Regions: make(map[string]RegionInfo),
|
||||||
|
Zones: make(map[string]ZoneInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
groupClustersInfo.generateClustersInfo(clustersScore, rbSpec)
|
||||||
|
groupClustersInfo.generateZoneInfo(spreadConstraints)
|
||||||
|
groupClustersInfo.generateRegionInfo(spreadConstraints)
|
||||||
|
groupClustersInfo.generateProviderInfo(spreadConstraints)
|
||||||
|
|
||||||
|
return groupClustersInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func groupClustersIngoreTopology(
|
||||||
|
clustersScore framework.ClusterScoreList,
|
||||||
|
rbSpec *workv1alpha2.ResourceBindingSpec,
|
||||||
|
) *GroupClustersInfo {
|
||||||
|
groupClustersInfo := &GroupClustersInfo{}
|
||||||
|
groupClustersInfo.generateClustersInfo(clustersScore, rbSpec)
|
||||||
|
|
||||||
|
return groupClustersInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info *GroupClustersInfo) generateClustersInfo(clustersScore framework.ClusterScoreList, rbSpec *workv1alpha2.ResourceBindingSpec) {
|
||||||
|
var clusters []*clusterv1alpha1.Cluster
|
||||||
|
for _, clusterScore := range clustersScore {
|
||||||
|
clusterInfo := ClusterDetailInfo{}
|
||||||
|
clusterInfo.Name = clusterScore.Cluster.Name
|
||||||
|
clusterInfo.Score = clusterScore.Score
|
||||||
|
clusterInfo.Cluster = clusterScore.Cluster
|
||||||
|
info.Clusters = append(info.Clusters, clusterInfo)
|
||||||
|
clusters = append(clusters, clusterScore.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
clustersReplicas := calAvailableReplicas(clusters, rbSpec)
|
||||||
|
for i, clustersReplica := range clustersReplicas {
|
||||||
|
info.Clusters[i].AvailableReplicas = int64(clustersReplica.Replicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
sortClusters(info.Clusters)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info *GroupClustersInfo) generateZoneInfo(spreadConstraints []policyv1alpha1.SpreadConstraint) {
|
||||||
|
if !IsSpreadConstraintExisted(spreadConstraints, policyv1alpha1.SpreadByFieldZone) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, clusterInfo := range info.Clusters {
|
||||||
|
zone := clusterInfo.Cluster.Spec.Zone
|
||||||
|
if zone == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
zoneInfo, ok := info.Zones[zone]
|
||||||
|
if !ok {
|
||||||
|
zoneInfo = ZoneInfo{
|
||||||
|
Name: zone,
|
||||||
|
Clusters: make([]ClusterDetailInfo, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
zoneInfo.Clusters = append(zoneInfo.Clusters, clusterInfo)
|
||||||
|
zoneInfo.Score += clusterInfo.Score
|
||||||
|
zoneInfo.AvailableReplicas += clusterInfo.AvailableReplicas
|
||||||
|
info.Zones[zone] = zoneInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info *GroupClustersInfo) generateRegionInfo(spreadConstraints []policyv1alpha1.SpreadConstraint) {
|
||||||
|
if !IsSpreadConstraintExisted(spreadConstraints, policyv1alpha1.SpreadByFieldRegion) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, clusterInfo := range info.Clusters {
|
||||||
|
region := clusterInfo.Cluster.Spec.Region
|
||||||
|
if region == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
regionInfo, ok := info.Regions[region]
|
||||||
|
if !ok {
|
||||||
|
regionInfo = RegionInfo{
|
||||||
|
Name: region,
|
||||||
|
Zones: make(map[string]struct{}),
|
||||||
|
Clusters: make([]ClusterDetailInfo, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if clusterInfo.Cluster.Spec.Zone != "" {
|
||||||
|
regionInfo.Zones[clusterInfo.Cluster.Spec.Zone] = struct{}{}
|
||||||
|
}
|
||||||
|
regionInfo.Clusters = append(regionInfo.Clusters, clusterInfo)
|
||||||
|
regionInfo.Score += clusterInfo.Score
|
||||||
|
regionInfo.AvailableReplicas += clusterInfo.AvailableReplicas
|
||||||
|
info.Regions[region] = regionInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info *GroupClustersInfo) generateProviderInfo(spreadConstraints []policyv1alpha1.SpreadConstraint) {
|
||||||
|
if !IsSpreadConstraintExisted(spreadConstraints, policyv1alpha1.SpreadByFieldProvider) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, clusterInfo := range info.Clusters {
|
||||||
|
provider := clusterInfo.Cluster.Spec.Provider
|
||||||
|
if provider == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
providerInfo, ok := info.Providers[provider]
|
||||||
|
if !ok {
|
||||||
|
providerInfo = ProviderInfo{
|
||||||
|
Name: provider,
|
||||||
|
Regions: make(map[string]struct{}),
|
||||||
|
Zones: make(map[string]struct{}),
|
||||||
|
Clusters: make([]ClusterDetailInfo, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if clusterInfo.Cluster.Spec.Zone != "" {
|
||||||
|
providerInfo.Zones[clusterInfo.Cluster.Spec.Zone] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if clusterInfo.Cluster.Spec.Region != "" {
|
||||||
|
providerInfo.Regions[clusterInfo.Cluster.Spec.Region] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
providerInfo.Clusters = append(providerInfo.Clusters, clusterInfo)
|
||||||
|
providerInfo.Score += clusterInfo.Score
|
||||||
|
providerInfo.AvailableReplicas += clusterInfo.AvailableReplicas
|
||||||
|
info.Providers[provider] = providerInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTopologyIgnored(placement *policyv1alpha1.Placement) bool {
|
||||||
|
strategy := placement.ReplicaScheduling
|
||||||
|
spreadConstraints := placement.SpreadConstraints
|
||||||
|
|
||||||
|
if len(spreadConstraints) == 0 || (len(spreadConstraints) == 1 && spreadConstraints[0].SpreadByField == policyv1alpha1.SpreadByFieldCluster) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the replica division preference is 'static weighted', ignore the declaration specified by spread constraints.
|
||||||
|
if strategy != nil && strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided &&
|
||||||
|
strategy.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted &&
|
||||||
|
(strategy.WeightPreference == nil || strategy.WeightPreference.DynamicWeight == "") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortClusters(infos []ClusterDetailInfo) {
|
||||||
|
sort.Slice(infos, func(i, j int) bool {
|
||||||
|
if infos[i].Score != infos[j].Score {
|
||||||
|
return infos[i].Score > infos[j].Score
|
||||||
|
}
|
||||||
|
|
||||||
|
return infos[i].Name < infos[j].Name
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package spreadconstraint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SelectBestClusters selects the cluster set based the GroupClustersInfo and placement
|
||||||
|
func SelectBestClusters(placement *policyv1alpha1.Placement, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) {
|
||||||
|
if len(placement.SpreadConstraints) != 0 {
|
||||||
|
return selectBestClustersBySpreadConstraints(placement.SpreadConstraints, groupClustersInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
var clusters []*clusterv1alpha1.Cluster
|
||||||
|
for _, cluster := range groupClustersInfo.Clusters {
|
||||||
|
clusters = append(clusters, cluster.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
return clusters, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectBestClustersBySpreadConstraints(spreadConstraints []policyv1alpha1.SpreadConstraint,
|
||||||
|
groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) {
|
||||||
|
if len(spreadConstraints) > 1 {
|
||||||
|
return nil, fmt.Errorf("just support single spread constraint")
|
||||||
|
}
|
||||||
|
|
||||||
|
spreadConstraint := spreadConstraints[0]
|
||||||
|
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster {
|
||||||
|
return selectBestClustersByCluster(spreadConstraint, groupClustersInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("just support cluster spread constraint")
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectBestClustersByCluster(spreadConstraint policyv1alpha1.SpreadConstraint, groupClustersInfo *GroupClustersInfo) ([]*clusterv1alpha1.Cluster, error) {
|
||||||
|
totalClusterCnt := len(groupClustersInfo.Clusters)
|
||||||
|
if spreadConstraint.MinGroups > totalClusterCnt {
|
||||||
|
return nil, fmt.Errorf("the number of feasible clusters is less than spreadConstraint.MinGroups")
|
||||||
|
}
|
||||||
|
|
||||||
|
needCnt := spreadConstraint.MaxGroups
|
||||||
|
if spreadConstraint.MaxGroups > totalClusterCnt {
|
||||||
|
needCnt = totalClusterCnt
|
||||||
|
}
|
||||||
|
|
||||||
|
var clusters []*clusterv1alpha1.Cluster
|
||||||
|
for i := 0; i < needCnt; i++ {
|
||||||
|
clusters = append(clusters, groupClustersInfo.Clusters[i].Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
return clusters, nil
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package spreadconstraint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||||
|
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
|
||||||
|
"github.com/karmada-io/karmada/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster {
|
||||||
|
availableClusters := make([]workv1alpha2.TargetCluster, len(clusters))
|
||||||
|
|
||||||
|
// Set the boundary.
|
||||||
|
for i := range availableClusters {
|
||||||
|
availableClusters[i].Name = clusters[i].Name
|
||||||
|
availableClusters[i].Replicas = math.MaxInt32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
|
||||||
|
estimators := estimatorclient.GetReplicaEstimators()
|
||||||
|
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
|
||||||
|
fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name))
|
||||||
|
for _, estimator := range estimators {
|
||||||
|
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Max cluster available replicas error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for i := range res {
|
||||||
|
if res[i].Replicas == estimatorclient.UnauthenticReplica {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if availableClusters[i].Name == res[i].Name && availableClusters[i].Replicas > res[i].Replicas {
|
||||||
|
availableClusters[i].Replicas = res[i].Replicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// In most cases, the target cluster max available replicas should not be MaxInt32 unless the workload is best-effort
|
||||||
|
// and the scheduler-estimator has not been enabled. So we set the replicas to spec.Replicas for avoiding overflow.
|
||||||
|
for i := range availableClusters {
|
||||||
|
if availableClusters[i].Replicas == math.MaxInt32 {
|
||||||
|
availableClusters[i].Replicas = spec.Replicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
klog.V(4).Infof("cluster replicas info: %v", availableClusters)
|
||||||
|
return availableClusters
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsSpreadConstraintExisted judge if the specific field is existed in the spread constraints
|
||||||
|
func IsSpreadConstraintExisted(spreadConstraints []policyv1alpha1.SpreadConstraint, field policyv1alpha1.SpreadFieldValue) bool {
|
||||||
|
for _, spreadConstraint := range spreadConstraints {
|
||||||
|
if spreadConstraint.SpreadByField == field {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
|
@ -1,30 +0,0 @@
|
||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
|
||||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SpreadGroup stores the cluster group info for given spread constraints
|
|
||||||
type SpreadGroup struct {
|
|
||||||
// The outer map's keys are SpreadConstraint. The values (inner map) of the outer map are maps with string
|
|
||||||
// keys and []string values. The inner map's key should specify the cluster group name.
|
|
||||||
GroupRecord map[policyv1alpha1.SpreadConstraint]map[string][]*clusterv1alpha1.Cluster
|
|
||||||
sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewSpreadGroup initializes a SpreadGroup
|
|
||||||
func NewSpreadGroup() *SpreadGroup {
|
|
||||||
return &SpreadGroup{
|
|
||||||
GroupRecord: make(map[policyv1alpha1.SpreadConstraint]map[string][]*clusterv1alpha1.Cluster),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// InitialGroupRecord initials a spread state record
|
|
||||||
func (ss *SpreadGroup) InitialGroupRecord(constraint policyv1alpha1.SpreadConstraint) {
|
|
||||||
ss.Lock()
|
|
||||||
defer ss.Unlock()
|
|
||||||
ss.GroupRecord[constraint] = make(map[string][]*clusterv1alpha1.Cluster)
|
|
||||||
}
|
|
Loading…
Reference in New Issue