diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index f1b913f84e..62b708ec36 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -128,6 +128,8 @@ type AutoscalingOptions struct { OkTotalUnreadyCount int // ScaleUpFromZero defines if CA should scale up when there 0 ready nodes. ScaleUpFromZero bool + // ParallelScaleUp defines whether CA can scale up node groups in parallel. + ParallelScaleUp bool // CloudConfig is the path to the cloud provider configuration file. Empty string for no configuration file. CloudConfig string // CloudProviderName sets the type of the cloud provider CA is about to run in. Allowed values: gce, aws diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go new file mode 100644 index 0000000000..55d0d66293 --- /dev/null +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -0,0 +1,248 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" +) + +// ScaleUpExecutor scales up node groups. +type scaleUpExecutor struct { + autoscalingContext *context.AutoscalingContext + clusterStateRegistry *clusterstate.ClusterStateRegistry +} + +// New returns new instance of scale up executor. +func newScaleUpExecutor( + autoscalingContext *context.AutoscalingContext, + clusterStateRegistry *clusterstate.ClusterStateRegistry, +) *scaleUpExecutor { + return &scaleUpExecutor{ + autoscalingContext: autoscalingContext, + clusterStateRegistry: clusterStateRegistry, + } +} + +// ExecuteScaleUps executes the scale ups, based on the provided scale up infos and options. +// May scale up groups concurrently when autoscler option is enabled. +// In case of issues returns an error and a scale up info which failed to execute. +// If there were multiple concurrent errors one combined error is returned. +func (e *scaleUpExecutor) ExecuteScaleUps( + scaleUpInfos []nodegroupset.ScaleUpInfo, + nodeInfos map[string]*schedulerframework.NodeInfo, + now time.Time, +) (errors.AutoscalerError, []cloudprovider.NodeGroup) { + options := e.autoscalingContext.AutoscalingOptions + if options.ParallelScaleUp { + return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now) + } + return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now) +} + +func (e *scaleUpExecutor) executeScaleUpsSync( + scaleUpInfos []nodegroupset.ScaleUpInfo, + nodeInfos map[string]*schedulerframework.NodeInfo, + now time.Time, +) (errors.AutoscalerError, []cloudprovider.NodeGroup) { + availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes() + for _, scaleUpInfo := range scaleUpInfos { + nodeInfo, ok := nodeInfos[scaleUpInfo.Group.Id()] + if !ok { + klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id()) + continue + } + if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now); aErr != nil { + return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group} + } + } + return nil, nil +} + +func (e *scaleUpExecutor) executeScaleUpsParallel( + scaleUpInfos []nodegroupset.ScaleUpInfo, + nodeInfos map[string]*schedulerframework.NodeInfo, + now time.Time, +) (errors.AutoscalerError, []cloudprovider.NodeGroup) { + if err := checkUniqueNodeGroups(scaleUpInfos); err != nil { + return err, extractNodeGroups(scaleUpInfos) + } + type errResult struct { + err errors.AutoscalerError + info *nodegroupset.ScaleUpInfo + } + scaleUpsLen := len(scaleUpInfos) + errResults := make(chan errResult, scaleUpsLen) + var wg sync.WaitGroup + wg.Add(scaleUpsLen) + availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes() + for _, scaleUpInfo := range scaleUpInfos { + go func(info nodegroupset.ScaleUpInfo) { + defer wg.Done() + nodeInfo, ok := nodeInfos[info.Group.Id()] + if !ok { + klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id()) + return + } + if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now); aErr != nil { + errResults <- errResult{err: aErr, info: &info} + } + }(scaleUpInfo) + } + wg.Wait() + close(errResults) + var results []errResult + for err := range errResults { + results = append(results, err) + } + if len(results) > 0 { + failedNodeGroups := make([]cloudprovider.NodeGroup, len(results)) + scaleUpErrors := make([]errors.AutoscalerError, len(results)) + for i, result := range results { + failedNodeGroups[i] = result.info.Group + scaleUpErrors[i] = result.err + } + return combineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups + } + return nil, nil +} + +func (e *scaleUpExecutor) executeScaleUp( + info nodegroupset.ScaleUpInfo, + nodeInfo *schedulerframework.NodeInfo, + availableGPUTypes map[string]struct{}, + now time.Time, +) errors.AutoscalerError { + gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) + gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) + klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) + e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", + "Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) + increase := info.NewSize - info.CurrentSize + if err := info.Group.IncreaseSize(increase); err != nil { + e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) + aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") + e.clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.FailedScaleUpReason(string(aerr.Type())), now) + return aerr + } + e.clusterStateRegistry.RegisterOrUpdateScaleUp( + info.Group, + increase, + time.Now()) + metrics.RegisterScaleUp(increase, gpuResourceName, gpuType) + e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", + "Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) + return nil +} + +func combineConcurrentScaleUpErrors(errs []errors.AutoscalerError) errors.AutoscalerError { + if len(errs) == 0 { + return nil + } + if len(errs) == 1 { + return errs[0] + } + uniqueMessages := make(map[string]bool) + uniqueTypes := make(map[errors.AutoscalerErrorType]bool) + for _, err := range errs { + uniqueTypes[err.Type()] = true + uniqueMessages[err.Error()] = true + } + if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 { + return errs[0] + } + // sort to stabilize the results and easier log aggregation + sort.Slice(errs, func(i, j int) bool { + errA := errs[i] + errB := errs[j] + if errA.Type() == errB.Type() { + return errs[i].Error() < errs[j].Error() + } + return errA.Type() < errB.Type() + }) + firstErr := errs[0] + printErrorTypes := len(uniqueTypes) > 1 + message := formatMessageFromConcurrentErrors(errs, printErrorTypes) + return errors.NewAutoscalerError(firstErr.Type(), message) +} + +func formatMessageFromConcurrentErrors(errs []errors.AutoscalerError, printErrorTypes bool) string { + firstErr := errs[0] + var builder strings.Builder + builder.WriteString(firstErr.Error()) + builder.WriteString(" ...and other concurrent errors: [") + formattedErrs := map[errors.AutoscalerError]bool{ + firstErr: true, + } + for _, err := range errs { + if _, has := formattedErrs[err]; has { + continue + } + formattedErrs[err] = true + var message string + if printErrorTypes { + message = fmt.Sprintf("[%s] %s", err.Type(), err.Error()) + } else { + message = err.Error() + } + if len(formattedErrs) > 2 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("%q", message)) + } + builder.WriteString("]") + return builder.String() +} + +// Checks if all groups are scaled only once. +// Scaling one group multiple times concurrently may cause problems. +func checkUniqueNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) errors.AutoscalerError { + uniqueGroups := make(map[string]bool) + for _, info := range scaleUpInfos { + if uniqueGroups[info.Group.Id()] { + return errors.NewAutoscalerError( + errors.InternalError, + "assertion failure: detected group double scaling: %s", info.Group.Id(), + ) + } + uniqueGroups[info.Group.Id()] = true + } + return nil +} + +func extractNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) []cloudprovider.NodeGroup { + groups := make([]cloudprovider.NodeGroup, len(scaleUpInfos)) + for i, info := range scaleUpInfos { + groups[i] = info.Group + } + return groups +} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor_test.go b/cluster-autoscaler/core/scaleup/orchestrator/executor_test.go new file mode 100644 index 0000000000..a7ef5d60f5 --- /dev/null +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor_test.go @@ -0,0 +1,127 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "testing" + + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + + "github.com/stretchr/testify/assert" +) + +func TestCombinedConcurrentScaleUpErrors(t *testing.T) { + cloudProviderErr := errors.NewAutoscalerError(errors.CloudProviderError, "provider error") + internalErr := errors.NewAutoscalerError(errors.InternalError, "internal error") + testCases := []struct { + desc string + errors []errors.AutoscalerError + expectedErr errors.AutoscalerError + }{ + { + desc: "no errors", + errors: []errors.AutoscalerError{}, + expectedErr: nil, + }, + { + desc: "single error", + errors: []errors.AutoscalerError{internalErr}, + expectedErr: internalErr, + }, + { + desc: "two duplicated errors", + errors: []errors.AutoscalerError{ + internalErr, + internalErr, + }, + expectedErr: internalErr, + }, + { + desc: "two different errors", + errors: []errors.AutoscalerError{ + cloudProviderErr, + internalErr, + }, + expectedErr: errors.NewAutoscalerError( + errors.CloudProviderError, + "provider error ...and other concurrent errors: [\"[internalError] internal error\"]", + ), + }, + { + desc: "two different errors - reverse alphabetical order", + errors: []errors.AutoscalerError{ + internalErr, + cloudProviderErr, + }, + expectedErr: errors.NewAutoscalerError( + errors.CloudProviderError, + "provider error ...and other concurrent errors: [\"[internalError] internal error\"]", + ), + }, + { + desc: "errors with the same type and different messages", + errors: []errors.AutoscalerError{ + errors.NewAutoscalerError(errors.InternalError, "A"), + errors.NewAutoscalerError(errors.InternalError, "B"), + errors.NewAutoscalerError(errors.InternalError, "C"), + }, + expectedErr: errors.NewAutoscalerError( + errors.InternalError, + "A ...and other concurrent errors: [\"B\", \"C\"]"), + }, + { + desc: "errors with the same type and some duplicated messages", + errors: []errors.AutoscalerError{ + errors.NewAutoscalerError(errors.InternalError, "A"), + errors.NewAutoscalerError(errors.InternalError, "B"), + errors.NewAutoscalerError(errors.InternalError, "A"), + }, + expectedErr: errors.NewAutoscalerError( + errors.InternalError, + "A ...and other concurrent errors: [\"B\"]"), + }, + { + desc: "some duplicated errors", + errors: []errors.AutoscalerError{ + errors.NewAutoscalerError(errors.CloudProviderError, "A"), + errors.NewAutoscalerError(errors.CloudProviderError, "A"), + errors.NewAutoscalerError(errors.CloudProviderError, "B"), + errors.NewAutoscalerError(errors.InternalError, "A"), + }, + expectedErr: errors.NewAutoscalerError( + errors.CloudProviderError, + "A ...and other concurrent errors: [\"[cloudProviderError] B\", \"[internalError] A\"]"), + }, + { + desc: "different errors with quotes in messages", + errors: []errors.AutoscalerError{ + errors.NewAutoscalerError(errors.InternalError, "\"first\""), + errors.NewAutoscalerError(errors.InternalError, "\"second\""), + }, + expectedErr: errors.NewAutoscalerError( + errors.InternalError, + "\"first\" ...and other concurrent errors: [\"\\\"second\\\"\"]"), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.desc, func(t *testing.T) { + combinedErr := combineConcurrentScaleUpErrors(testCase.errors) + assert.Equal(t, testCase.expectedErr, combinedErr) + }) + } +} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 11394261ce..759602fdbd 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -22,6 +22,9 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -36,11 +39,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/klogx" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/klog/v2" - schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) // ScaleUpOrchestrator implements scaleup.Orchestrator interface. @@ -49,6 +49,7 @@ type ScaleUpOrchestrator struct { processors *ca_processors.AutoscalingProcessors resourceManager *resource.Manager clusterStateRegistry *clusterstate.ClusterStateRegistry + scaleUpExecutor *scaleUpExecutor taintConfig taints.TaintConfig initialized bool } @@ -72,6 +73,7 @@ func (o *ScaleUpOrchestrator) Initialize( o.clusterStateRegistry = clusterStateRegistry o.taintConfig = taintConfig o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor) + o.scaleUpExecutor = newScaleUpExecutor(autoscalingContext, clusterStateRegistry) o.initialized = true } @@ -284,7 +286,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( } if len(targetNodeGroups) > 1 { - var names = []string{} + names := []string{} for _, ng := range targetNodeGroups { names = append(names, ng.Id()) } @@ -300,11 +302,12 @@ func (o *ScaleUpOrchestrator) ScaleUp( } klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) - if aErr, failedInfo := o.ExecuteScaleUps(scaleUpInfos, nodeInfos, now); aErr != nil { + aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) + if aErr != nil { return scaleUpError( &status.ScaleUpStatus{ CreateNodeGroupResults: createNodeGroupResults, - FailedResizeNodeGroups: []cloudprovider.NodeGroup{failedInfo.Group}, + FailedResizeNodeGroups: failedNodeGroups, PodsTriggeredScaleUp: bestOption.Pods, }, aErr, @@ -405,10 +408,11 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( } klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos) - if aErr, failedInfo := o.ExecuteScaleUps(scaleUpInfos, nodeInfos, now); aErr != nil { + aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) + if aErr != nil { return scaleUpError( &status.ScaleUpStatus{ - FailedResizeNodeGroups: []cloudprovider.NodeGroup{failedInfo.Group}, + FailedResizeNodeGroups: failedNodeGroups, }, aErr, ) @@ -553,60 +557,11 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou return newNodeCount, nil } -// ExecuteScaleUps executes the scale ups, based on the provided scale up infos. -// In case of issues returns an error and a scale up info which failed to execute. -func (o *ScaleUpOrchestrator) ExecuteScaleUps( - scaleUpInfos []nodegroupset.ScaleUpInfo, - nodeInfos map[string]*schedulerframework.NodeInfo, - now time.Time, -) (errors.AutoscalerError, *nodegroupset.ScaleUpInfo) { - availableGPUTypes := o.autoscalingContext.CloudProvider.GetAvailableGPUTypes() - for _, info := range scaleUpInfos { - nodeInfo, ok := nodeInfos[info.Group.Id()] - if !ok { - klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id()) - continue - } - gpuConfig := o.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) - gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) - if aErr := o.executeScaleUp(info, gpuResourceName, gpuType, now); aErr != nil { - return aErr, &info - } - } - return nil, nil -} - -func (o *ScaleUpOrchestrator) executeScaleUp( - info nodegroupset.ScaleUpInfo, - gpuResourceName, gpuType string, - now time.Time, -) errors.AutoscalerError { - klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) - o.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", - "Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) - increase := info.NewSize - info.CurrentSize - if err := info.Group.IncreaseSize(increase); err != nil { - o.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) - aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") - o.clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.FailedScaleUpReason(string(aerr.Type())), now) - return aerr - } - o.clusterStateRegistry.RegisterOrUpdateScaleUp( - info.Group, - increase, - time.Now()) - metrics.RegisterScaleUp(increase, gpuResourceName, gpuType) - o.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", - "Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) - return nil -} - func filterNodeGroupsByPods( groups []cloudprovider.NodeGroup, podsRequiredToFit []*apiv1.Pod, expansionOptions map[string]expander.Option, ) []cloudprovider.NodeGroup { - result := make([]cloudprovider.NodeGroup, 0) for _, group := range groups { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 23d9b38d35..bc6e6d9eea 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -22,11 +22,14 @@ import ( "net/http/httptest" "regexp" "strings" + "sync/atomic" "testing" "time" + kube_record "k8s.io/client-go/tools/record" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - mockprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mocks" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" @@ -35,7 +38,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/metrics" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -43,8 +45,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" - kube_record "k8s.io/client-go/tools/record" - "k8s.io/component-base/metrics/legacyregistry" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" @@ -52,7 +52,6 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/stretchr/testify/assert" - "k8s.io/autoscaler/cluster-autoscaler/expander" ) var defaultOptions = config.AutoscalingOptions{ @@ -65,7 +64,7 @@ var defaultOptions = config.AutoscalingOptions{ // Scale up scenarios. func TestScaleUpOK(t *testing.T) { - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 100, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng2"}, @@ -77,8 +76,7 @@ func TestScaleUpOK(t *testing.T) { ExtraPods: []PodConfig{ {Name: "p-new", Cpu: 500, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, }, - Options: defaultOptions, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 1}, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 1}, } expectedResults := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1}, @@ -92,7 +90,7 @@ func TestScaleUpOK(t *testing.T) { // There are triggering, remaining & awaiting pods. func TestMixedScaleUp(t *testing.T) { - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 100, Memory: 1000, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 1000, Memory: 100, Gpu: 0, Ready: true, Group: "ng2"}, @@ -109,8 +107,7 @@ func TestMixedScaleUp(t *testing.T) { // can only be scheduled on ng1 {Name: "awaiting", Cpu: 0, Memory: 200, Gpu: 0, Node: "", ToleratesGpu: false}, }, - Options: defaultOptions, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 1}, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 1}, } expectedResults := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1}, @@ -127,7 +124,7 @@ func TestMixedScaleUp(t *testing.T) { func TestScaleUpMaxCoresLimitHit(t *testing.T) { options := defaultOptions options.MaxCoresTotal = 9 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 4000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng2"}, @@ -140,8 +137,8 @@ func TestScaleUpMaxCoresLimitHit(t *testing.T) { {Name: "p-new-1", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-2", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 2}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 2}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 1}, @@ -156,7 +153,7 @@ func TestScaleUpMaxCoresLimitHit(t *testing.T) { func TestScaleUpMaxCoresLimitHitWithNotAutoscaledGroup(t *testing.T) { options := defaultOptions options.MaxCoresTotal = 9 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 4000, Memory: 1000, Gpu: 0, Ready: true, Group: ""}, @@ -169,8 +166,8 @@ func TestScaleUpMaxCoresLimitHitWithNotAutoscaledGroup(t *testing.T) { {Name: "p-new-1", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-2", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 2}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 2}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 1}, @@ -185,7 +182,7 @@ func TestScaleUpMaxCoresLimitHitWithNotAutoscaledGroup(t *testing.T) { func TestScaleUpMaxMemoryLimitHit(t *testing.T) { options := defaultOptions options.MaxMemoryTotal = 1300 * utils.MiB - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "ng2"}, @@ -199,8 +196,8 @@ func TestScaleUpMaxMemoryLimitHit(t *testing.T) { {Name: "p-new-2", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-3", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 3}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 3}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 2}, @@ -215,7 +212,7 @@ func TestScaleUpMaxMemoryLimitHit(t *testing.T) { func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) { options := defaultOptions options.MaxMemoryTotal = 1300 * utils.MiB - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: ""}, @@ -229,8 +226,8 @@ func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) { {Name: "p-new-2", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-3", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 3}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 3}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 2}, @@ -242,10 +239,140 @@ func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) { simpleScaleUpTest(t, config, results) } +func TestScaleUpTwoGroups(t *testing.T) { + options := defaultOptions + options.BalanceSimilarNodeGroups = true + options.ParallelScaleUp = true + config := &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "ng1-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng1"}, + {Name: "ng2-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng2"}, + }, + Pods: []PodConfig{ + {Name: "p1", Cpu: 1400, Node: "ng1-n1"}, + {Name: "p2", Cpu: 1400, Node: "ng2-n1"}, + }, + ExtraPods: []PodConfig{ + {Name: "p3", Cpu: 1400}, + {Name: "p4", Cpu: 1400}, + }, + Options: &options, + } + testCases := []struct { + desc string + parallel bool + }{ + { + desc: "synchronous scale up", + parallel: false, + }, + { + desc: "parallel scale up", + parallel: true, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + config.Options.ParallelScaleUp = tc.parallel + result := runSimpleScaleUpTest(t, config) + assert.True(t, result.ScaleUpStatus.WasSuccessful()) + assert.Nil(t, result.ScaleUpError) + assert.Equal(t, result.GroupTargetSizes, map[string]int{ + "ng1": 2, + "ng2": 2, + }) + assert.ElementsMatch(t, result.ScaleUpStatus.PodsTriggeredScaleUp, []string{"p3", "p4"}) + }) + } +} + +func TestCloudProviderFailingToScaleUpGroups(t *testing.T) { + options := defaultOptions + options.BalanceSimilarNodeGroups = true + config := &ScaleUpTestConfig{ + Groups: []NodeGroupConfig{ + {Name: "ng1", MaxSize: 2}, + {Name: "ng2", MaxSize: 2}, + }, + Nodes: []NodeConfig{ + {Name: "ng1-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng1"}, + {Name: "ng2-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng2"}, + }, + Pods: []PodConfig{ + {Name: "p1", Cpu: 1400, Node: "ng1-n1"}, + {Name: "p2", Cpu: 1400, Node: "ng2-n1"}, + }, + ExtraPods: []PodConfig{ + {Name: "p3", Cpu: 1400}, + {Name: "p4", Cpu: 1400}, + }, + Options: &options, + } + failAlwaysScaleUp := func(group string, i int) error { + return fmt.Errorf("provider error for: %s", group) + } + failOnceScaleUp := func() testprovider.OnScaleUpFunc { + var executed atomic.Bool + return func(group string, _ int) error { + if !executed.Swap(true) { + return fmt.Errorf("provider error for: %s", group) + } + return nil + } + } + testCases := []struct { + desc string + parallel bool + onScaleUp testprovider.OnScaleUpFunc + expectConcurrentErrors bool + expectedTotalTargetSizes int + }{ + { + desc: "synchronous scale up - two failures", + parallel: false, + onScaleUp: failAlwaysScaleUp, + expectConcurrentErrors: false, + expectedTotalTargetSizes: 3, // first error stops scale up process + }, + { + desc: "parallel scale up - two failures", + parallel: true, + onScaleUp: failAlwaysScaleUp, + expectConcurrentErrors: true, + expectedTotalTargetSizes: 4, + }, + { + desc: "synchronous scale up - one failure", + parallel: false, + onScaleUp: failOnceScaleUp(), + expectConcurrentErrors: false, + expectedTotalTargetSizes: 3, + }, + { + desc: "parallel scale up - one failure", + parallel: true, + onScaleUp: failOnceScaleUp(), + expectConcurrentErrors: false, + expectedTotalTargetSizes: 4, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + config.Options.ParallelScaleUp = tc.parallel + config.OnScaleUp = tc.onScaleUp + result := runSimpleScaleUpTest(t, config) + assert.False(t, result.ScaleUpStatus.WasSuccessful()) + assert.Equal(t, errors.CloudProviderError, result.ScaleUpError.Type()) + assert.Equal(t, tc.expectedTotalTargetSizes, result.GroupTargetSizes["ng1"]+result.GroupTargetSizes["ng2"]) + assert.Equal(t, tc.expectConcurrentErrors, strings.Contains(result.ScaleUpError.Error(), "...and other concurrent errors")) + }) + } +} + func TestScaleUpCapToMaxTotalNodesLimit(t *testing.T) { options := defaultOptions options.MaxNodesTotal = 3 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "ng2"}, @@ -259,8 +386,8 @@ func TestScaleUpCapToMaxTotalNodesLimit(t *testing.T) { {Name: "p-new-2", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-3", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 3}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 3}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1}, @@ -275,7 +402,7 @@ func TestScaleUpCapToMaxTotalNodesLimit(t *testing.T) { func TestScaleUpCapToMaxTotalNodesLimitWithNotAutoscaledGroup(t *testing.T) { options := defaultOptions options.MaxNodesTotal = 3 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: ""}, {Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "ng2"}, @@ -289,8 +416,8 @@ func TestScaleUpCapToMaxTotalNodesLimitWithNotAutoscaledGroup(t *testing.T) { {Name: "p-new-2", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-3", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 3}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 3}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1}, @@ -305,7 +432,7 @@ func TestScaleUpCapToMaxTotalNodesLimitWithNotAutoscaledGroup(t *testing.T) { func TestWillConsiderGpuAndStandardPoolForPodWhichDoesNotRequireGpu(t *testing.T) { options := defaultOptions options.MaxNodesTotal = 100 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "gpu-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Ready: true, Group: "gpu-pool"}, {Name: "std-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "std-pool"}, @@ -317,8 +444,8 @@ func TestWillConsiderGpuAndStandardPoolForPodWhichDoesNotRequireGpu(t *testing.T ExtraPods: []PodConfig{ {Name: "extra-std-pod", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: true}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "std-pool", SizeChange: 1}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "std-pool", SizeChange: 1}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "std-pool", SizeChange: 1}, @@ -337,7 +464,7 @@ func TestWillConsiderGpuAndStandardPoolForPodWhichDoesNotRequireGpu(t *testing.T func TestWillConsiderOnlyGpuPoolForPodWhichDoesRequiresGpu(t *testing.T) { options := defaultOptions options.MaxNodesTotal = 100 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "gpu-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Ready: true, Group: "gpu-pool"}, {Name: "std-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "std-pool"}, @@ -349,8 +476,8 @@ func TestWillConsiderOnlyGpuPoolForPodWhichDoesRequiresGpu(t *testing.T) { ExtraPods: []PodConfig{ {Name: "extra-gpu-pod", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Node: "", ToleratesGpu: true}, }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "gpu-pool", SizeChange: 1}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "gpu-pool", SizeChange: 1}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "gpu-pool", SizeChange: 1}, @@ -368,7 +495,7 @@ func TestWillConsiderOnlyGpuPoolForPodWhichDoesRequiresGpu(t *testing.T) { func TestWillConsiderAllPoolsWhichFitTwoPodsRequiringGpus(t *testing.T) { options := defaultOptions options.MaxNodesTotal = 100 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "gpu-1-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Ready: true, Group: "gpu-1-pool"}, {Name: "gpu-2-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 2, Ready: true, Group: "gpu-2-pool"}, @@ -386,8 +513,8 @@ func TestWillConsiderAllPoolsWhichFitTwoPodsRequiringGpus(t *testing.T) { {Name: "extra-gpu-pod-2", Cpu: 1, Memory: 1 * utils.MiB, Gpu: 1, Node: "", ToleratesGpu: true}, // CPU and mem negligible {Name: "extra-gpu-pod-3", Cpu: 1, Memory: 1 * utils.MiB, Gpu: 1, Node: "", ToleratesGpu: true}, // CPU and mem negligible }, - ExpansionOptionToChoose: GroupSizeChange{GroupName: "gpu-1-pool", SizeChange: 3}, - Options: options, + ExpansionOptionToChoose: &GroupSizeChange{GroupName: "gpu-1-pool", SizeChange: 3}, + Options: &options, } results := &ScaleTestResults{ FinalOption: GroupSizeChange{GroupName: "gpu-1-pool", SizeChange: 3}, @@ -409,7 +536,7 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) { options := defaultOptions options.MaxCoresTotal = 7 options.MaxMemoryTotal = 1150 - config := &ScaleTestConfig{ + config := &ScaleUpTestConfig{ Nodes: []NodeConfig{ {Name: "n1", Cpu: 2000, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"}, {Name: "n2", Cpu: 4000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng2"}, @@ -422,7 +549,7 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) { {Name: "p-new-1", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, {Name: "p-new-2", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false}, }, - Options: options, + Options: &options, } results := &ScaleTestResults{ NoScaleUpReason: "max cluster cpu, memory limit reached", @@ -434,152 +561,46 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) { simpleNoScaleUpTest(t, config, results) } -// To implement expander.Strategy, BestOption method must have a struct receiver. -// This prevents it from modifying fields of reportingStrategy, so we need a thin -// pointer wrapper for mutable parts. -type expanderResults struct { - inputOptions []GroupSizeChange -} - -type reportingStrategy struct { - initialNodeConfigs []NodeConfig - optionToChoose GroupSizeChange - results *expanderResults - t *testing.T -} - -func (r reportingStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option { - r.results.inputOptions = expanderOptionsToGroupSizeChanges(options) - for _, option := range options { - GroupSizeChange := expanderOptionToGroupSizeChange(option) - if GroupSizeChange == r.optionToChoose { - return &option - } - } - assert.Fail(r.t, "did not find expansionOptionToChoose %s", r.optionToChoose) - return nil -} - -func expanderOptionsToGroupSizeChanges(options []expander.Option) []GroupSizeChange { - groupSizeChanges := make([]GroupSizeChange, 0, len(options)) - for _, option := range options { - GroupSizeChange := expanderOptionToGroupSizeChange(option) - groupSizeChanges = append(groupSizeChanges, GroupSizeChange) - } - return groupSizeChanges -} - -func expanderOptionToGroupSizeChange(option expander.Option) GroupSizeChange { - groupName := option.NodeGroup.Id() - groupSizeIncrement := option.NodeCount - scaleUpOption := GroupSizeChange{GroupName: groupName, SizeChange: groupSizeIncrement} - return scaleUpOption -} - -func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResults { - expandedGroups := make(chan GroupSizeChange, 10) - now := time.Now() - - groups := make(map[string][]*apiv1.Node) - nodes := make([]*apiv1.Node, 0, len(config.Nodes)) - for _, n := range config.Nodes { - node := BuildTestNode(n.Name, n.Cpu, n.Memory) - if n.Gpu > 0 { - AddGpusToNode(node, n.Gpu) - } - SetNodeReadyState(node, n.Ready, now.Add(-2*time.Minute)) - nodes = append(nodes, node) - if n.Group != "" { - groups[n.Group] = append(groups[n.Group], node) - } - } - - pods := make([]*apiv1.Pod, 0, len(config.Pods)) - for _, p := range config.Pods { - pod := buildTestPod(p) - pods = append(pods, pod) - } - - podLister := kube_util.NewTestPodLister(pods) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) - - provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { - expandedGroups <- GroupSizeChange{GroupName: nodeGroup, SizeChange: increase} - return nil - }, nil) - - for name, nodesInGroup := range groups { - provider.AddNodeGroup(name, 1, 10, len(nodesInGroup)) - for _, n := range nodesInGroup { - provider.AddNode(name, n) - } - } - - resourceLimiter := cloudprovider.NewResourceLimiter( - map[string]int64{cloudprovider.ResourceNameCores: config.Options.MinCoresTotal, cloudprovider.ResourceNameMemory: config.Options.MinMemoryTotal}, - map[string]int64{cloudprovider.ResourceNameCores: config.Options.MaxCoresTotal, cloudprovider.ResourceNameMemory: config.Options.MaxMemoryTotal}) - provider.SetResourceLimiter(resourceLimiter) - - assert.NotNil(t, provider) - - // Create context with non-random expander strategy. - context, err := NewScaleTestAutoscalingContext(config.Options, &fake.Clientset{}, listers, provider, nil, nil) - assert.NoError(t, err) - - expander := reportingStrategy{ - initialNodeConfigs: config.Nodes, - optionToChoose: config.ExpansionOptionToChoose, - results: &expanderResults{}, - t: t, - } - context.ExpanderStrategy = expander - - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute)) - clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) - - extraPods := make([]*apiv1.Pod, 0, len(config.ExtraPods)) - for _, p := range config.ExtraPods { - pod := buildTestPod(p) - extraPods = append(extraPods, pod) - } - - processors := NewTestProcessors(&context) - suOrchestrator := New() - suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) - processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) - - assert.NoError(t, err) - - expandedGroup := getGroupSizeChangeFromChan(expandedGroups) - var expandedGroupStruct GroupSizeChange - if expandedGroup != nil { - expandedGroupStruct = *expandedGroup - } - - events := []string{} - for eventsLeft := true; eventsLeft; { - select { - case event := <-context.Recorder.(*kube_record.FakeRecorder).Events: - events = append(events, event) - default: - eventsLeft = false - } - } - - return &ScaleTestResults{ - ExpansionOptions: expander.results.inputOptions, - FinalOption: expandedGroupStruct, - ScaleUpStatus: simplifyScaleUpStatus(scaleUpStatus), - Events: events, - } -} - -func simpleNoScaleUpTest(t *testing.T, config *ScaleTestConfig, expectedResults *ScaleTestResults) { +func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) { results := runSimpleScaleUpTest(t, config) + assert.NotNil(t, results.GroupSizeChanges[0], "Expected scale up event") + assert.Equal(t, expectedResults.FinalOption, results.GroupSizeChanges[0]) + assert.True(t, results.ScaleUpStatus.WasSuccessful()) + nodeEventSeen := false + for _, event := range results.Events { + if strings.Contains(event, "TriggeredScaleUp") && strings.Contains(event, expectedResults.FinalOption.GroupName) { + nodeEventSeen = true + } + if len(expectedResults.ScaleUpStatus.PodsRemainUnschedulable) == 0 { + assert.NotRegexp(t, regexp.MustCompile("NotTriggerScaleUp"), event) + } + } + assert.True(t, nodeEventSeen) + if len(expectedResults.ExpansionOptions) > 0 { + // Empty ExpansionOptions means we do not want to do any assertions + // on contents of actual scaleUp options + if config.ExpansionOptionToChoose != nil { + // Check that option to choose is part of expected options. + assert.Contains(t, expectedResults.ExpansionOptions, *config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options") + assert.Contains(t, results.ExpansionOptions, *config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options") + } + assert.ElementsMatch(t, results.ExpansionOptions, expectedResults.ExpansionOptions, + "actual and expected expansion options should be the same") + } + if expectedResults.GroupTargetSizes != nil { + assert.Equal(t, expectedResults.GroupTargetSizes, results.GroupTargetSizes) + } + assert.ElementsMatch(t, results.ScaleUpStatus.PodsTriggeredScaleUp, expectedResults.ScaleUpStatus.PodsTriggeredScaleUp, + "actual and expected triggering pods should be the same") + assert.ElementsMatch(t, results.ScaleUpStatus.PodsRemainUnschedulable, expectedResults.ScaleUpStatus.PodsRemainUnschedulable, + "actual and expected remaining pods should be the same") + assert.ElementsMatch(t, results.ScaleUpStatus.PodsAwaitEvaluation, expectedResults.ScaleUpStatus.PodsAwaitEvaluation, + "actual and expected awaiting evaluation pods should be the same") +} - assert.Equal(t, GroupSizeChange{}, results.FinalOption) +func simpleNoScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) { + results := runSimpleScaleUpTest(t, config) + assert.Nil(t, results.GroupSizeChanges) assert.False(t, results.ScaleUpStatus.WasSuccessful()) noScaleUpEventSeen := false for _, event := range results.Events { @@ -602,49 +623,119 @@ func simpleNoScaleUpTest(t *testing.T, config *ScaleTestConfig, expectedResults "actual and expected awaiting evaluation pods should be the same") } -func simpleScaleUpTest(t *testing.T, config *ScaleTestConfig, expectedResults *ScaleTestResults) { - results := runSimpleScaleUpTest(t, config) +func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestResult { + now := time.Now() + groupSizeChangesChannel := make(chan GroupSizeChange, 20) + groupNodes := make(map[string][]*apiv1.Node) - assert.NotNil(t, results.FinalOption, "Expected scale up event") - assert.Equal(t, expectedResults.FinalOption, results.FinalOption) - assert.True(t, results.ScaleUpStatus.WasSuccessful()) - nodeEventSeen := false - for _, event := range results.Events { - if strings.Contains(event, "TriggeredScaleUp") && strings.Contains(event, expectedResults.FinalOption.GroupName) { - nodeEventSeen = true + // build nodes + nodes := make([]*apiv1.Node, 0, len(config.Nodes)) + for _, n := range config.Nodes { + node := BuildTestNode(n.Name, n.Cpu, n.Memory) + if n.Gpu > 0 { + AddGpusToNode(node, n.Gpu) } - if len(expectedResults.ScaleUpStatus.PodsRemainUnschedulable) == 0 { - assert.NotRegexp(t, regexp.MustCompile("NotTriggerScaleUp"), event) + SetNodeReadyState(node, n.Ready, now.Add(-2*time.Minute)) + nodes = append(nodes, node) + if n.Group != "" { + groupNodes[n.Group] = append(groupNodes[n.Group], node) } } - assert.True(t, nodeEventSeen) - if len(expectedResults.ExpansionOptions) > 0 { - // Empty ExpansionOptions means we do not want to do any assertions - // on contents of actual scaleUp options - - // Check that option to choose is part of expected options. - assert.Contains(t, expectedResults.ExpansionOptions, config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options") - assert.Contains(t, results.ExpansionOptions, config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options") - - assert.ElementsMatch(t, results.ExpansionOptions, expectedResults.ExpansionOptions, - "actual and expected expansion options should be the same") + // build and setup pods + pods := make([]*apiv1.Pod, len(config.Pods)) + for i, p := range config.Pods { + pods[i] = buildTestPod(p) } + extraPods := make([]*apiv1.Pod, len(config.ExtraPods)) + for i, p := range config.ExtraPods { + extraPods[i] = buildTestPod(p) + } + podLister := kube_util.NewTestPodLister(pods) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) - assert.ElementsMatch(t, results.ScaleUpStatus.PodsTriggeredScaleUp, expectedResults.ScaleUpStatus.PodsTriggeredScaleUp, - "actual and expected triggering pods should be the same") - assert.ElementsMatch(t, results.ScaleUpStatus.PodsRemainUnschedulable, expectedResults.ScaleUpStatus.PodsRemainUnschedulable, - "actual and expected remaining pods should be the same") - assert.ElementsMatch(t, results.ScaleUpStatus.PodsAwaitEvaluation, expectedResults.ScaleUpStatus.PodsAwaitEvaluation, - "actual and expected awaiting evaluation pods should be the same") -} - -func getGroupSizeChangeFromChan(c chan GroupSizeChange) *GroupSizeChange { - select { - case val := <-c: - return &val - case <-time.After(100 * time.Millisecond): + // setup node groups + provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { + groupSizeChangesChannel <- GroupSizeChange{GroupName: nodeGroup, SizeChange: increase} + if config.OnScaleUp != nil { + return config.OnScaleUp(nodeGroup, increase) + } return nil + }, nil) + options := defaultOptions + if config.Options != nil { + options = *config.Options + } + resourceLimiter := cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: options.MinCoresTotal, cloudprovider.ResourceNameMemory: options.MinMemoryTotal}, + map[string]int64{cloudprovider.ResourceNameCores: options.MaxCoresTotal, cloudprovider.ResourceNameMemory: options.MaxMemoryTotal}) + provider.SetResourceLimiter(resourceLimiter) + groupConfigs := make(map[string]*NodeGroupConfig) + for _, group := range config.Groups { + groupConfigs[group.Name] = &group + } + for name, nodesInGroup := range groupNodes { + groupConfig := groupConfigs[name] + if groupConfig == nil { + groupConfig = &NodeGroupConfig{ + Name: name, + MinSize: 1, + MaxSize: 10, + } + } + provider.AddNodeGroup(name, groupConfig.MinSize, groupConfig.MaxSize, len(nodesInGroup)) + for _, n := range nodesInGroup { + provider.AddNode(name, n) + } + } + + // build orchestrator + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) + assert.NoError(t, err) + nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false). + Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + assert.NoError(t, err) + clusterState := clusterstate. + NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute)) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) + processors := NewTestProcessors(&context) + orchestrator := New() + orchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) + expander := NewMockRepotingStrategy(t, config.ExpansionOptionToChoose) + context.ExpanderStrategy = expander + + // scale up + scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) + processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) + + // aggregate group size changes + close(groupSizeChangesChannel) + var groupSizeChanges []GroupSizeChange + for change := range groupSizeChangesChannel { + groupSizeChanges = append(groupSizeChanges, change) + } + + // aggregate events + eventsChannel := context.Recorder.(*kube_record.FakeRecorder).Events + close(eventsChannel) + events := []string{} + for event := range eventsChannel { + events = append(events, event) + } + + // build target sizes + targetSizes := make(map[string]int) + for _, group := range provider.NodeGroups() { + targetSizes[group.Id()], _ = group.TargetSize() + } + + return &ScaleUpTestResult{ + ScaleUpError: scaleUpErr, + ScaleUpStatus: simplifyScaleUpStatus(scaleUpStatus), + GroupSizeChanges: groupSizeChanges, + Events: events, + GroupTargetSizes: targetSizes, + ExpansionOptions: expander.LastInputOptions(), } } @@ -1049,24 +1140,33 @@ func TestCheckDeltaWithinLimits(t *testing.T) { } } -func TestAuthError(t *testing.T) { +func TestAuthErrorHandling(t *testing.T) { metrics.RegisterAll(false) - context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil, nil) - assert.NoError(t, err) - - nodeGroup := &mockprovider.NodeGroup{} - info := nodegroupset.ScaleUpInfo{Group: nodeGroup} - nodeGroup.On("Id").Return("A") - nodeGroup.On("IncreaseSize", 0).Return(errors.NewAutoscalerError(errors.AutoscalerErrorType("abcd"), "")) - - processors := NewTestProcessors(&context) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(nil, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute)) - suOrchestrator := New() - suOrchestrator.Initialize(&context, processors, clusterStateRegistry, taints.TaintConfig{}) - scaleUpOrchestrator := suOrchestrator.(*ScaleUpOrchestrator) - aerr := scaleUpOrchestrator.executeScaleUp(info, "", "", time.Now()) - assert.Error(t, aerr) + config := &ScaleUpTestConfig{ + Groups: []NodeGroupConfig{ + {Name: "ng1", MaxSize: 2}, + }, + Nodes: []NodeConfig{ + {Name: "ng1-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng1"}, + }, + ExtraPods: []PodConfig{ + {Name: "p1", Cpu: 1000}, + }, + OnScaleUp: func(group string, i int) error { + return errors.NewAutoscalerError(errors.AutoscalerErrorType("authError"), "auth error") + }, + Options: &defaultOptions, + } + results := runSimpleScaleUpTest(t, config) + expected := errors.NewAutoscalerError( + errors.AutoscalerErrorType("authError"), + "failed to increase node group size: auth error", + ) + assert.Equal(t, expected, results.ScaleUpError) + assertLegacyRegistryEntry(t, "cluster_autoscaler_failed_scale_ups_total{reason=\"authError\"} 1") +} +func assertLegacyRegistryEntry(t *testing.T, entry string) { req, err := http.NewRequest("GET", "/", nil) if err != nil { t.Fatal(err) @@ -1074,14 +1174,13 @@ func TestAuthError(t *testing.T) { rr := httptest.NewRecorder() handler := http.HandlerFunc(legacyregistry.Handler().ServeHTTP) handler.ServeHTTP(rr, req) - // Check that the status code is what we expect. if status := rr.Code; status != http.StatusOK { t.Errorf("handler returned wrong status code: got %v want %v", status, http.StatusOK) } // Check that the failed scale up reason is set correctly. - assert.Contains(t, rr.Body.String(), "cluster_autoscaler_failed_scale_ups_total{reason=\"abcd\"} 1") + assert.Contains(t, rr.Body.String(), entry) } func simplifyScaleUpStatus(scaleUpStatus *status.ScaleUpStatus) ScaleUpStatusInfo { diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index a60282b1eb..5a84f4435b 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -22,10 +22,6 @@ import ( "testing" "time" - "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" - "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" - - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" testcloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" @@ -33,7 +29,10 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors" @@ -50,14 +49,14 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" + "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/labels" "github.com/stretchr/testify/assert" - apiv1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" + "k8s.io/apimachinery/pkg/api/resource" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -102,9 +101,38 @@ type ScaleTestConfig struct { ExpectedScaleDownCount int } +// NodeGroupConfig is a node group config used in tests +type NodeGroupConfig struct { + Name string + MinSize int + MaxSize int +} + +// ScaleUpTestConfig represents a config of a scale test +type ScaleUpTestConfig struct { + Groups []NodeGroupConfig + Nodes []NodeConfig + Pods []PodConfig + ExtraPods []PodConfig + OnScaleUp testcloudprovider.OnScaleUpFunc + ExpansionOptionToChoose *GroupSizeChange + Options *config.AutoscalingOptions +} + +// ScaleUpTestResult represents a node groups scale up result +type ScaleUpTestResult struct { + ScaleUpError errors.AutoscalerError + ScaleUpStatus ScaleUpStatusInfo + GroupSizeChanges []GroupSizeChange + ExpansionOptions []GroupSizeChange + Events []string + GroupTargetSizes map[string]int +} + // ScaleTestResults contains results of a scale test type ScaleTestResults struct { ExpansionOptions []GroupSizeChange + GroupTargetSizes map[string]int FinalOption GroupSizeChange NoScaleUpReason string FinalScaleDowns []string @@ -159,7 +187,8 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal func NewScaleTestAutoscalingContext( options config.AutoscalingOptions, fakeClient kube_client.Interface, listers kube_util.ListerRegistry, provider cloudprovider.CloudProvider, - processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (context.AutoscalingContext, error) { + processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, +) (context.AutoscalingContext, error) { // Not enough buffer space causes the test to hang without printing any logs. // This is not useful. fakeRecorder := kube_record.NewFakeRecorder(100) @@ -279,8 +308,8 @@ type MockAutoprovisioningNodeGroupListProcessor struct { // Process extends the list of node groups func (p *MockAutoprovisioningNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulerframework.NodeInfo, - unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulerframework.NodeInfo, error) { - + unschedulablePods []*apiv1.Pod, +) ([]cloudprovider.NodeGroup, map[string]*schedulerframework.NodeInfo, error) { machines, err := context.CloudProvider.GetAvailableMachineTypes() assert.NoError(p.T, err) @@ -305,3 +334,67 @@ func NewBackoff() backoff.Backoff { return backoff.NewIdBasedExponentialBackoff(5*time.Minute, /*InitialNodeGroupBackoffDuration*/ 30*time.Minute /*MaxNodeGroupBackoffDuration*/, 3*time.Hour /*NodeGroupBackoffResetTimeout*/) } + +// To implement expander.Strategy, BestOption method must have a struct receiver. +// This prevents it from modifying fields of reportingStrategy, so we need a thin +// pointer wrapper for mutable parts. +type expanderResults struct { + inputOptions []GroupSizeChange +} + +// MockReportingStrategy implements expander.Strategy +type MockReportingStrategy struct { + defaultStrategy expander.Strategy + optionToChoose *GroupSizeChange + t *testing.T + results *expanderResults +} + +// NewMockRepotingStrategy creates an expander strategy with reporting and mocking capabilities. +func NewMockRepotingStrategy(t *testing.T, optionToChoose *GroupSizeChange) *MockReportingStrategy { + return &MockReportingStrategy{ + defaultStrategy: random.NewStrategy(), + results: &expanderResults{}, + optionToChoose: optionToChoose, + t: t, + } +} + +// LastInputOptions provides access to expansion options passed as an input in recent strategy execution +func (r *MockReportingStrategy) LastInputOptions() []GroupSizeChange { + return r.results.inputOptions +} + +// BestOption satisfies the Strategy interface. Picks the best option from those passed as an argument. +// When parameter optionToChoose is defined, it's picked as the best one. +// Otherwise, random option is used. +func (r *MockReportingStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option { + r.results.inputOptions = expanderOptionsToGroupSizeChanges(options) + if r.optionToChoose == nil { + return r.defaultStrategy.BestOption(options, nodeInfo) + } + for _, option := range options { + groupSizeChange := expanderOptionToGroupSizeChange(option) + if groupSizeChange == *r.optionToChoose { + return &option + } + } + assert.Fail(r.t, "did not find expansionOptionToChoose %+v", r.optionToChoose) + return nil +} + +func expanderOptionsToGroupSizeChanges(options []expander.Option) []GroupSizeChange { + groupSizeChanges := make([]GroupSizeChange, 0, len(options)) + for _, option := range options { + groupSizeChange := expanderOptionToGroupSizeChange(option) + groupSizeChanges = append(groupSizeChanges, groupSizeChange) + } + return groupSizeChanges +} + +func expanderOptionToGroupSizeChange(option expander.Option) GroupSizeChange { + groupName := option.NodeGroup.Id() + groupSizeIncrement := option.NodeCount + scaleUpOption := GroupSizeChange{GroupName: groupName, SizeChange: groupSizeIncrement} + return scaleUpOption +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 0c44cf908c..073e3fdf1b 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -147,7 +147,8 @@ var ( maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.") maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations") okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage") - scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there 0 ready nodes.") + scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there are 0 ready nodes.") + parallelScaleUp = flag.Bool("parallel-scale-up", false, "Whether to allow parallel node groups scale up. Experimental: may not work on some cloud providers, enable at your own risk.") maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "The default maximum time CA waits for node to be provisioned - the value can be overridden per node group") maxPodEvictionTime = flag.Duration("max-pod-eviction-time", 2*time.Minute, "Maximum time CA tries to evict a pod before giving up") nodeGroupsFlag = multiStringFlag( @@ -265,6 +266,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxTotalUnreadyPercentage: *maxTotalUnreadyPercentage, OkTotalUnreadyCount: *okTotalUnreadyCount, ScaleUpFromZero: *scaleUpFromZero, + ParallelScaleUp: *parallelScaleUp, EstimatorName: *estimatorFlag, ExpanderNames: *expanderFlag, GRPCExpanderCert: *grpcExpanderCert,