Merge pull request #5750 from BigDarkClown/similar

Fix scale-up similar node group computation
This commit is contained in:
Kubernetes Prow Robot 2023-05-11 08:15:02 -07:00 committed by GitHub
commit ee59c74cc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 284 additions and 111 deletions

View File

@ -125,48 +125,29 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}
now := time.Now()
expansionOptions := make(map[string]expander.Option, 0)
skippedNodeGroups := map[string]status.Reasons{}
for _, nodeGroup := range nodeGroups {
if skipReason := o.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
// Filter out invalid node groups
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, now)
currentTargetSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Failed to get node group size: %v", err)
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if currentTargetSize >= nodeGroup.MaxSize() {
klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = MaxLimitReachedReason
continue
}
// Calculate expansion options
schedulablePods := map[string][]*apiv1.Pod{}
var options []expander.Option
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
klog.Errorf("No node info for: %s", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
option := o.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if len(option.Pods) > 0 && option.NodeCount > 0 {
expansionOptions[nodeGroup.Id()] = option
} else {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
}
for _, nodeGroup := range validNodeGroups {
schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()])
}
if len(expansionOptions) == 0 {
for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, upcomingNodes, now)
if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
continue
}
options = append(options, option)
}
if len(options) == 0 {
klog.V(1).Info("No expansion options")
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
@ -176,10 +157,6 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}
// Pick some expansion option.
options := make([]expander.Option, 0, len(expansionOptions))
for _, o := range expansionOptions {
options = append(options, o)
}
bestOption := o.autoscalingContext.ExpanderStrategy.BestOption(options, nodeInfos)
if bestOption == nil || bestOption.NodeCount <= 0 {
return &status.ScaleUpStatus{
@ -233,17 +210,16 @@ func (o *ScaleUpOrchestrator) ScaleUp(
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
option := o.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if len(option.Pods) > 0 && option.NodeCount > 0 {
expansionOptions[nodeGroup.Id()] = option
}
schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo)
}
// Update ClusterStateRegistry so similar nodegroups rebalancing works.
// TODO(lukaszos) when pursuing scalability update this call with one which takes list of changed node groups so we do not
// do extra API calls. (the call at the bottom of ScaleUp() could be also changed then)
o.clusterStateRegistry.Recalculate()
// Recompute similar node groups
bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePods, now)
}
nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()]
@ -266,32 +242,16 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if o.autoscalingContext.BalanceSimilarNodeGroups {
similarNodeGroups, aErr := o.processors.NodeGroupSetProcessor.FindSimilarNodeGroups(o.autoscalingContext, bestOption.NodeGroup, nodeInfos)
if aErr != nil {
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr.AddPrefix("failed to find matching node groups: "))
}
for _, ng := range bestOption.SimilarNodeGroups {
targetNodeGroups = append(targetNodeGroups, ng)
}
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions)
for _, ng := range similarNodeGroups {
if o.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
targetNodeGroups = append(targetNodeGroups, ng)
} else {
// This should never happen, as we will filter out the node group earlier on because of missing
// entry in podsPassingPredicates, but double checking doesn't really cost us anything.
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
}
}
if len(targetNodeGroups) > 1 {
names := []string{}
for _, ng := range targetNodeGroups {
names = append(names, ng.Id())
}
klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", "))
if len(targetNodeGroups) > 1 {
var names []string
for _, ng := range targetNodeGroups {
names = append(names, ng.Id())
}
klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", "))
}
scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes)
@ -426,24 +386,69 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
}, nil
}
// filterValidScaleUpNodeGroups filters the node groups that are valid for scale-up
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
resourcesLeft resource.Limits,
now time.Time,
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
var validNodeGroups []cloudprovider.NodeGroup
skippedNodeGroups := map[string]status.Reasons{}
for _, nodeGroup := range nodeGroups {
if skipReason := o.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
currentTargetSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Failed to get node group size: %v", err)
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if currentTargetSize >= nodeGroup.MaxSize() {
klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = MaxLimitReachedReason
continue
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
klog.Errorf("No node info for: %s", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
validNodeGroups = append(validNodeGroups, nodeGroup)
}
return validNodeGroups, skippedNodeGroups
}
// ComputeExpansionOption computes expansion option based on pending pods and cluster state.
func (o *ScaleUpOrchestrator) ComputeExpansionOption(
podEquivalenceGroups []*equivalence.PodGroup,
nodeGroup cloudprovider.NodeGroup,
nodeInfo *schedulerframework.NodeInfo,
schedulablePods map[string][]*apiv1.Pod,
nodeInfos map[string]*schedulerframework.NodeInfo,
upcomingNodes []*schedulerframework.NodeInfo,
now time.Time,
) expander.Option {
option := expander.Option{
NodeGroup: nodeGroup,
Pods: make([]*apiv1.Pod, 0),
}
option.Pods = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo)
if len(option.Pods) > 0 {
estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot)
option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
option := expander.Option{NodeGroup: nodeGroup}
pods := schedulablePods[nodeGroup.Id()]
nodeInfo := nodeInfos[nodeGroup.Id()]
if len(pods) == 0 {
return option
}
estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot)
option.NodeCount, option.Pods = estimator.Estimate(pods, nodeInfo, nodeGroup)
option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now)
return option
}
@ -557,37 +562,52 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou
return newNodeCount, nil
}
func filterNodeGroupsByPods(
groups []cloudprovider.NodeGroup,
podsRequiredToFit []*apiv1.Pod,
expansionOptions map[string]expander.Option,
// ComputeSimilarNodeGroups finds similar node groups which can schedule the same
// set of pods as the main node group.
func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
nodeGroup cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
schedulablePods map[string][]*apiv1.Pod,
now time.Time,
) []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0)
if !o.autoscalingContext.BalanceSimilarNodeGroups {
return nil
}
for _, group := range groups {
option, found := expansionOptions[group.Id()]
if !found {
klog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration", group.Id())
continue
}
fittingPods := make(map[*apiv1.Pod]bool, len(option.Pods))
for _, pod := range option.Pods {
fittingPods[pod] = true
}
allFit := true
for _, pod := range podsRequiredToFit {
if _, found := fittingPods[pod]; !found {
klog.V(1).Infof("Group %v, can't fit pod %v/%v, removing from scale-up consideration", group.Id(), pod.Namespace, pod.Name)
allFit = false
break
}
}
if allFit {
result = append(result, group)
groupSchedulablePods, found := schedulablePods[nodeGroup.Id()]
if !found || len(groupSchedulablePods) == 0 {
return nil
}
similarNodeGroups, err := o.processors.NodeGroupSetProcessor.FindSimilarNodeGroups(o.autoscalingContext, nodeGroup, nodeInfos)
if err != nil {
klog.Errorf("Failed to find similar node groups: %v", err)
return nil
}
var validSimilarNodeGroups []cloudprovider.NodeGroup
for _, ng := range similarNodeGroups {
if !o.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
} else if similarSchedulablePods, found := schedulablePods[ng.Id()]; found && matchingSchedulablePods(groupSchedulablePods, similarSchedulablePods) {
validSimilarNodeGroups = append(validSimilarNodeGroups, ng)
}
}
return result
return validSimilarNodeGroups
}
func matchingSchedulablePods(groupSchedulablePods []*apiv1.Pod, similarSchedulablePods []*apiv1.Pod) bool {
schedulablePods := make(map[*apiv1.Pod]bool)
for _, pod := range similarSchedulablePods {
schedulablePods[pod] = true
}
for _, pod := range groupSchedulablePods {
if _, found := schedulablePods[pod]; !found {
return false
}
}
return true
}
// GetRemainingPods returns information about pods which CA is unable to help

View File

@ -33,11 +33,14 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@ -852,6 +855,155 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.Regexp(t, regexp.MustCompile("NotTriggerScaleUp"), event)
}
type constNodeGroupSetProcessor struct {
similarNodeGroups []cloudprovider.NodeGroup
}
func (p *constNodeGroupSetProcessor) FindSimilarNodeGroups(_ *context.AutoscalingContext, _ cloudprovider.NodeGroup, _ map[string]*schedulerframework.NodeInfo) ([]cloudprovider.NodeGroup, errors.AutoscalerError) {
return p.similarNodeGroups, nil
}
func (p *constNodeGroupSetProcessor) BalanceScaleUpBetweenGroups(_ *context.AutoscalingContext, _ []cloudprovider.NodeGroup, _ int) ([]nodegroupset.ScaleUpInfo, errors.AutoscalerError) {
return nil, nil
}
func (p *constNodeGroupSetProcessor) CleanUp() {}
func TestComputeSimilarNodeGroups(t *testing.T) {
pod1 := BuildTestPod("p1", 100, 1000)
pod2 := BuildTestPod("p2", 100, 1000)
pod3 := BuildTestPod("p3", 100, 1000)
testCases := []struct {
name string
nodeGroup string
similarNodeGroups []string
otherNodeGroups []string
balancingEnabled bool
schedulablePods map[string][]*apiv1.Pod
wantSimilarNodeGroups []string
}{
{
name: "no similar node groups",
nodeGroup: "ng1",
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
wantSimilarNodeGroups: []string{},
},
{
name: "some similar node groups, but no schedulable pods",
nodeGroup: "ng1",
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
wantSimilarNodeGroups: []string{},
},
{
name: "some similar node groups and same schedulable pods, but balancing disabled",
nodeGroup: "ng1",
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: false,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1},
"ng2": {pod1},
"ng3": {pod1},
"pg1": {pod1},
"pg2": {pod1},
},
wantSimilarNodeGroups: []string{},
},
{
name: "some similar node groups and same schedulable pods",
nodeGroup: "ng1",
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1},
"ng2": {pod1},
"ng3": {pod1},
"pg1": {pod1},
"pg2": {pod1},
},
wantSimilarNodeGroups: []string{"ng2", "ng3"},
},
{
name: "similar node groups can schedule more pods",
nodeGroup: "ng1",
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1},
"ng2": {pod1, pod2},
"ng3": {pod1, pod2, pod3},
"pg1": {pod1, pod2},
"pg2": {pod1, pod2, pod3},
},
wantSimilarNodeGroups: []string{"ng2", "ng3"},
},
{
name: "similar node groups can schedule different/no pods",
nodeGroup: "ng1",
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1, pod2},
"ng2": {pod1},
"pg1": {pod1},
},
wantSimilarNodeGroups: []string{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
provider := testprovider.NewTestCloudProvider(func(string, int) error { return nil }, nil)
nodeGroupSetProcessor := &constNodeGroupSetProcessor{}
now := time.Now()
allNodeGroups := []string{tc.nodeGroup}
allNodeGroups = append(allNodeGroups, tc.similarNodeGroups...)
allNodeGroups = append(allNodeGroups, tc.otherNodeGroups...)
var nodes []*apiv1.Node
for _, ng := range allNodeGroups {
nodeName := fmt.Sprintf("%s-node", ng)
node := BuildTestNode(nodeName, 100, 1000)
SetNodeReadyState(node, true, now.Add(-2*time.Minute))
nodes = append(nodes, node)
provider.AddNodeGroup(ng, 0, 10, 1)
provider.AddNode(ng, node)
}
for _, ng := range tc.similarNodeGroups {
nodeGroupSetProcessor.similarNodeGroups = append(nodeGroupSetProcessor.similarNodeGroups, provider.GetNodeGroup(ng))
}
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute))
assert.NoError(t, clusterState.UpdateNodes(nodes, nodeInfos, time.Now()))
suOrchestrator := &ScaleUpOrchestrator{}
suOrchestrator.Initialize(&ctx, &processors.AutoscalingProcessors{NodeGroupSetProcessor: nodeGroupSetProcessor}, clusterState, taints.TaintConfig{})
similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePods, now)
var gotSimilarNodeGroups []string
for _, ng := range similarNodeGroups {
gotSimilarNodeGroups = append(gotSimilarNodeGroups, ng.Id())
}
assert.ElementsMatch(t, gotSimilarNodeGroups, tc.wantSimilarNodeGroups)
})
}
}
func TestScaleUpBalanceGroups(t *testing.T) {
provider := testprovider.NewTestCloudProvider(func(string, int) error {
return nil

View File

@ -42,10 +42,11 @@ var (
// Option describes an option to expand the cluster.
type Option struct {
NodeGroup cloudprovider.NodeGroup
NodeCount int
Debug string
Pods []*apiv1.Pod
NodeGroup cloudprovider.NodeGroup
SimilarNodeGroups []cloudprovider.NodeGroup
NodeCount int
Debug string
Pods []*apiv1.Pod
}
// Strategy describes an interface for selecting the best option when scaling up