Merge pull request #5731 from pmendelski/feature-parallel-scale-up
Feature: parallel scale up
This commit is contained in:
commit
ac8253f366
|
|
@ -128,6 +128,8 @@ type AutoscalingOptions struct {
|
|||
OkTotalUnreadyCount int
|
||||
// ScaleUpFromZero defines if CA should scale up when there 0 ready nodes.
|
||||
ScaleUpFromZero bool
|
||||
// ParallelScaleUp defines whether CA can scale up node groups in parallel.
|
||||
ParallelScaleUp bool
|
||||
// CloudConfig is the path to the cloud provider configuration file. Empty string for no configuration file.
|
||||
CloudConfig string
|
||||
// CloudProviderName sets the type of the cloud provider CA is about to run in. Allowed values: gce, aws
|
||||
|
|
|
|||
|
|
@ -0,0 +1,248 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
|
||||
)
|
||||
|
||||
// ScaleUpExecutor scales up node groups.
|
||||
type scaleUpExecutor struct {
|
||||
autoscalingContext *context.AutoscalingContext
|
||||
clusterStateRegistry *clusterstate.ClusterStateRegistry
|
||||
}
|
||||
|
||||
// New returns new instance of scale up executor.
|
||||
func newScaleUpExecutor(
|
||||
autoscalingContext *context.AutoscalingContext,
|
||||
clusterStateRegistry *clusterstate.ClusterStateRegistry,
|
||||
) *scaleUpExecutor {
|
||||
return &scaleUpExecutor{
|
||||
autoscalingContext: autoscalingContext,
|
||||
clusterStateRegistry: clusterStateRegistry,
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteScaleUps executes the scale ups, based on the provided scale up infos and options.
|
||||
// May scale up groups concurrently when autoscler option is enabled.
|
||||
// In case of issues returns an error and a scale up info which failed to execute.
|
||||
// If there were multiple concurrent errors one combined error is returned.
|
||||
func (e *scaleUpExecutor) ExecuteScaleUps(
|
||||
scaleUpInfos []nodegroupset.ScaleUpInfo,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
now time.Time,
|
||||
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
|
||||
options := e.autoscalingContext.AutoscalingOptions
|
||||
if options.ParallelScaleUp {
|
||||
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now)
|
||||
}
|
||||
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now)
|
||||
}
|
||||
|
||||
func (e *scaleUpExecutor) executeScaleUpsSync(
|
||||
scaleUpInfos []nodegroupset.ScaleUpInfo,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
now time.Time,
|
||||
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
|
||||
availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
|
||||
for _, scaleUpInfo := range scaleUpInfos {
|
||||
nodeInfo, ok := nodeInfos[scaleUpInfo.Group.Id()]
|
||||
if !ok {
|
||||
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id())
|
||||
continue
|
||||
}
|
||||
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now); aErr != nil {
|
||||
return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group}
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (e *scaleUpExecutor) executeScaleUpsParallel(
|
||||
scaleUpInfos []nodegroupset.ScaleUpInfo,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
now time.Time,
|
||||
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
|
||||
if err := checkUniqueNodeGroups(scaleUpInfos); err != nil {
|
||||
return err, extractNodeGroups(scaleUpInfos)
|
||||
}
|
||||
type errResult struct {
|
||||
err errors.AutoscalerError
|
||||
info *nodegroupset.ScaleUpInfo
|
||||
}
|
||||
scaleUpsLen := len(scaleUpInfos)
|
||||
errResults := make(chan errResult, scaleUpsLen)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(scaleUpsLen)
|
||||
availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
|
||||
for _, scaleUpInfo := range scaleUpInfos {
|
||||
go func(info nodegroupset.ScaleUpInfo) {
|
||||
defer wg.Done()
|
||||
nodeInfo, ok := nodeInfos[info.Group.Id()]
|
||||
if !ok {
|
||||
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id())
|
||||
return
|
||||
}
|
||||
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now); aErr != nil {
|
||||
errResults <- errResult{err: aErr, info: &info}
|
||||
}
|
||||
}(scaleUpInfo)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errResults)
|
||||
var results []errResult
|
||||
for err := range errResults {
|
||||
results = append(results, err)
|
||||
}
|
||||
if len(results) > 0 {
|
||||
failedNodeGroups := make([]cloudprovider.NodeGroup, len(results))
|
||||
scaleUpErrors := make([]errors.AutoscalerError, len(results))
|
||||
for i, result := range results {
|
||||
failedNodeGroups[i] = result.info.Group
|
||||
scaleUpErrors[i] = result.err
|
||||
}
|
||||
return combineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (e *scaleUpExecutor) executeScaleUp(
|
||||
info nodegroupset.ScaleUpInfo,
|
||||
nodeInfo *schedulerframework.NodeInfo,
|
||||
availableGPUTypes map[string]struct{},
|
||||
now time.Time,
|
||||
) errors.AutoscalerError {
|
||||
gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node())
|
||||
gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil)
|
||||
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
|
||||
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
|
||||
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
|
||||
increase := info.NewSize - info.CurrentSize
|
||||
if err := info.Group.IncreaseSize(increase); err != nil {
|
||||
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
|
||||
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
|
||||
e.clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.FailedScaleUpReason(string(aerr.Type())), now)
|
||||
return aerr
|
||||
}
|
||||
e.clusterStateRegistry.RegisterOrUpdateScaleUp(
|
||||
info.Group,
|
||||
increase,
|
||||
time.Now())
|
||||
metrics.RegisterScaleUp(increase, gpuResourceName, gpuType)
|
||||
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
|
||||
"Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
func combineConcurrentScaleUpErrors(errs []errors.AutoscalerError) errors.AutoscalerError {
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(errs) == 1 {
|
||||
return errs[0]
|
||||
}
|
||||
uniqueMessages := make(map[string]bool)
|
||||
uniqueTypes := make(map[errors.AutoscalerErrorType]bool)
|
||||
for _, err := range errs {
|
||||
uniqueTypes[err.Type()] = true
|
||||
uniqueMessages[err.Error()] = true
|
||||
}
|
||||
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
|
||||
return errs[0]
|
||||
}
|
||||
// sort to stabilize the results and easier log aggregation
|
||||
sort.Slice(errs, func(i, j int) bool {
|
||||
errA := errs[i]
|
||||
errB := errs[j]
|
||||
if errA.Type() == errB.Type() {
|
||||
return errs[i].Error() < errs[j].Error()
|
||||
}
|
||||
return errA.Type() < errB.Type()
|
||||
})
|
||||
firstErr := errs[0]
|
||||
printErrorTypes := len(uniqueTypes) > 1
|
||||
message := formatMessageFromConcurrentErrors(errs, printErrorTypes)
|
||||
return errors.NewAutoscalerError(firstErr.Type(), message)
|
||||
}
|
||||
|
||||
func formatMessageFromConcurrentErrors(errs []errors.AutoscalerError, printErrorTypes bool) string {
|
||||
firstErr := errs[0]
|
||||
var builder strings.Builder
|
||||
builder.WriteString(firstErr.Error())
|
||||
builder.WriteString(" ...and other concurrent errors: [")
|
||||
formattedErrs := map[errors.AutoscalerError]bool{
|
||||
firstErr: true,
|
||||
}
|
||||
for _, err := range errs {
|
||||
if _, has := formattedErrs[err]; has {
|
||||
continue
|
||||
}
|
||||
formattedErrs[err] = true
|
||||
var message string
|
||||
if printErrorTypes {
|
||||
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
|
||||
} else {
|
||||
message = err.Error()
|
||||
}
|
||||
if len(formattedErrs) > 2 {
|
||||
builder.WriteString(", ")
|
||||
}
|
||||
builder.WriteString(fmt.Sprintf("%q", message))
|
||||
}
|
||||
builder.WriteString("]")
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
// Checks if all groups are scaled only once.
|
||||
// Scaling one group multiple times concurrently may cause problems.
|
||||
func checkUniqueNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) errors.AutoscalerError {
|
||||
uniqueGroups := make(map[string]bool)
|
||||
for _, info := range scaleUpInfos {
|
||||
if uniqueGroups[info.Group.Id()] {
|
||||
return errors.NewAutoscalerError(
|
||||
errors.InternalError,
|
||||
"assertion failure: detected group double scaling: %s", info.Group.Id(),
|
||||
)
|
||||
}
|
||||
uniqueGroups[info.Group.Id()] = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) []cloudprovider.NodeGroup {
|
||||
groups := make([]cloudprovider.NodeGroup, len(scaleUpInfos))
|
||||
for i, info := range scaleUpInfos {
|
||||
groups[i] = info.Group
|
||||
}
|
||||
return groups
|
||||
}
|
||||
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCombinedConcurrentScaleUpErrors(t *testing.T) {
|
||||
cloudProviderErr := errors.NewAutoscalerError(errors.CloudProviderError, "provider error")
|
||||
internalErr := errors.NewAutoscalerError(errors.InternalError, "internal error")
|
||||
testCases := []struct {
|
||||
desc string
|
||||
errors []errors.AutoscalerError
|
||||
expectedErr errors.AutoscalerError
|
||||
}{
|
||||
{
|
||||
desc: "no errors",
|
||||
errors: []errors.AutoscalerError{},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
desc: "single error",
|
||||
errors: []errors.AutoscalerError{internalErr},
|
||||
expectedErr: internalErr,
|
||||
},
|
||||
{
|
||||
desc: "two duplicated errors",
|
||||
errors: []errors.AutoscalerError{
|
||||
internalErr,
|
||||
internalErr,
|
||||
},
|
||||
expectedErr: internalErr,
|
||||
},
|
||||
{
|
||||
desc: "two different errors",
|
||||
errors: []errors.AutoscalerError{
|
||||
cloudProviderErr,
|
||||
internalErr,
|
||||
},
|
||||
expectedErr: errors.NewAutoscalerError(
|
||||
errors.CloudProviderError,
|
||||
"provider error ...and other concurrent errors: [\"[internalError] internal error\"]",
|
||||
),
|
||||
},
|
||||
{
|
||||
desc: "two different errors - reverse alphabetical order",
|
||||
errors: []errors.AutoscalerError{
|
||||
internalErr,
|
||||
cloudProviderErr,
|
||||
},
|
||||
expectedErr: errors.NewAutoscalerError(
|
||||
errors.CloudProviderError,
|
||||
"provider error ...and other concurrent errors: [\"[internalError] internal error\"]",
|
||||
),
|
||||
},
|
||||
{
|
||||
desc: "errors with the same type and different messages",
|
||||
errors: []errors.AutoscalerError{
|
||||
errors.NewAutoscalerError(errors.InternalError, "A"),
|
||||
errors.NewAutoscalerError(errors.InternalError, "B"),
|
||||
errors.NewAutoscalerError(errors.InternalError, "C"),
|
||||
},
|
||||
expectedErr: errors.NewAutoscalerError(
|
||||
errors.InternalError,
|
||||
"A ...and other concurrent errors: [\"B\", \"C\"]"),
|
||||
},
|
||||
{
|
||||
desc: "errors with the same type and some duplicated messages",
|
||||
errors: []errors.AutoscalerError{
|
||||
errors.NewAutoscalerError(errors.InternalError, "A"),
|
||||
errors.NewAutoscalerError(errors.InternalError, "B"),
|
||||
errors.NewAutoscalerError(errors.InternalError, "A"),
|
||||
},
|
||||
expectedErr: errors.NewAutoscalerError(
|
||||
errors.InternalError,
|
||||
"A ...and other concurrent errors: [\"B\"]"),
|
||||
},
|
||||
{
|
||||
desc: "some duplicated errors",
|
||||
errors: []errors.AutoscalerError{
|
||||
errors.NewAutoscalerError(errors.CloudProviderError, "A"),
|
||||
errors.NewAutoscalerError(errors.CloudProviderError, "A"),
|
||||
errors.NewAutoscalerError(errors.CloudProviderError, "B"),
|
||||
errors.NewAutoscalerError(errors.InternalError, "A"),
|
||||
},
|
||||
expectedErr: errors.NewAutoscalerError(
|
||||
errors.CloudProviderError,
|
||||
"A ...and other concurrent errors: [\"[cloudProviderError] B\", \"[internalError] A\"]"),
|
||||
},
|
||||
{
|
||||
desc: "different errors with quotes in messages",
|
||||
errors: []errors.AutoscalerError{
|
||||
errors.NewAutoscalerError(errors.InternalError, "\"first\""),
|
||||
errors.NewAutoscalerError(errors.InternalError, "\"second\""),
|
||||
},
|
||||
expectedErr: errors.NewAutoscalerError(
|
||||
errors.InternalError,
|
||||
"\"first\" ...and other concurrent errors: [\"\\\"second\\\"\"]"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.desc, func(t *testing.T) {
|
||||
combinedErr := combineConcurrentScaleUpErrors(testCase.errors)
|
||||
assert.Equal(t, testCase.expectedErr, combinedErr)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -22,6 +22,9 @@ import (
|
|||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
|
|
@ -36,11 +39,8 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/klogx"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
"k8s.io/klog/v2"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
// ScaleUpOrchestrator implements scaleup.Orchestrator interface.
|
||||
|
|
@ -49,6 +49,7 @@ type ScaleUpOrchestrator struct {
|
|||
processors *ca_processors.AutoscalingProcessors
|
||||
resourceManager *resource.Manager
|
||||
clusterStateRegistry *clusterstate.ClusterStateRegistry
|
||||
scaleUpExecutor *scaleUpExecutor
|
||||
taintConfig taints.TaintConfig
|
||||
initialized bool
|
||||
}
|
||||
|
|
@ -72,6 +73,7 @@ func (o *ScaleUpOrchestrator) Initialize(
|
|||
o.clusterStateRegistry = clusterStateRegistry
|
||||
o.taintConfig = taintConfig
|
||||
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
|
||||
o.scaleUpExecutor = newScaleUpExecutor(autoscalingContext, clusterStateRegistry)
|
||||
o.initialized = true
|
||||
}
|
||||
|
||||
|
|
@ -284,7 +286,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
|
|||
}
|
||||
|
||||
if len(targetNodeGroups) > 1 {
|
||||
var names = []string{}
|
||||
names := []string{}
|
||||
for _, ng := range targetNodeGroups {
|
||||
names = append(names, ng.Id())
|
||||
}
|
||||
|
|
@ -300,11 +302,12 @@ func (o *ScaleUpOrchestrator) ScaleUp(
|
|||
}
|
||||
|
||||
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
|
||||
if aErr, failedInfo := o.ExecuteScaleUps(scaleUpInfos, nodeInfos, now); aErr != nil {
|
||||
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
|
||||
if aErr != nil {
|
||||
return scaleUpError(
|
||||
&status.ScaleUpStatus{
|
||||
CreateNodeGroupResults: createNodeGroupResults,
|
||||
FailedResizeNodeGroups: []cloudprovider.NodeGroup{failedInfo.Group},
|
||||
FailedResizeNodeGroups: failedNodeGroups,
|
||||
PodsTriggeredScaleUp: bestOption.Pods,
|
||||
},
|
||||
aErr,
|
||||
|
|
@ -405,10 +408,11 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
|
|||
}
|
||||
|
||||
klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
|
||||
if aErr, failedInfo := o.ExecuteScaleUps(scaleUpInfos, nodeInfos, now); aErr != nil {
|
||||
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
|
||||
if aErr != nil {
|
||||
return scaleUpError(
|
||||
&status.ScaleUpStatus{
|
||||
FailedResizeNodeGroups: []cloudprovider.NodeGroup{failedInfo.Group},
|
||||
FailedResizeNodeGroups: failedNodeGroups,
|
||||
},
|
||||
aErr,
|
||||
)
|
||||
|
|
@ -553,60 +557,11 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou
|
|||
return newNodeCount, nil
|
||||
}
|
||||
|
||||
// ExecuteScaleUps executes the scale ups, based on the provided scale up infos.
|
||||
// In case of issues returns an error and a scale up info which failed to execute.
|
||||
func (o *ScaleUpOrchestrator) ExecuteScaleUps(
|
||||
scaleUpInfos []nodegroupset.ScaleUpInfo,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
now time.Time,
|
||||
) (errors.AutoscalerError, *nodegroupset.ScaleUpInfo) {
|
||||
availableGPUTypes := o.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
|
||||
for _, info := range scaleUpInfos {
|
||||
nodeInfo, ok := nodeInfos[info.Group.Id()]
|
||||
if !ok {
|
||||
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id())
|
||||
continue
|
||||
}
|
||||
gpuConfig := o.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node())
|
||||
gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil)
|
||||
if aErr := o.executeScaleUp(info, gpuResourceName, gpuType, now); aErr != nil {
|
||||
return aErr, &info
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (o *ScaleUpOrchestrator) executeScaleUp(
|
||||
info nodegroupset.ScaleUpInfo,
|
||||
gpuResourceName, gpuType string,
|
||||
now time.Time,
|
||||
) errors.AutoscalerError {
|
||||
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
|
||||
o.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
|
||||
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
|
||||
increase := info.NewSize - info.CurrentSize
|
||||
if err := info.Group.IncreaseSize(increase); err != nil {
|
||||
o.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
|
||||
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
|
||||
o.clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.FailedScaleUpReason(string(aerr.Type())), now)
|
||||
return aerr
|
||||
}
|
||||
o.clusterStateRegistry.RegisterOrUpdateScaleUp(
|
||||
info.Group,
|
||||
increase,
|
||||
time.Now())
|
||||
metrics.RegisterScaleUp(increase, gpuResourceName, gpuType)
|
||||
o.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
|
||||
"Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterNodeGroupsByPods(
|
||||
groups []cloudprovider.NodeGroup,
|
||||
podsRequiredToFit []*apiv1.Pod,
|
||||
expansionOptions map[string]expander.Option,
|
||||
) []cloudprovider.NodeGroup {
|
||||
|
||||
result := make([]cloudprovider.NodeGroup, 0)
|
||||
|
||||
for _, group := range groups {
|
||||
|
|
|
|||
|
|
@ -22,11 +22,14 @@ import (
|
|||
"net/http/httptest"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
kube_record "k8s.io/client-go/tools/record"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
mockprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mocks"
|
||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
|
|
@ -35,7 +38,6 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
|
|
@ -43,8 +45,6 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
|
||||
kube_record "k8s.io/client-go/tools/record"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
|
|
@ -52,7 +52,6 @@ import (
|
|||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||
)
|
||||
|
||||
var defaultOptions = config.AutoscalingOptions{
|
||||
|
|
@ -65,7 +64,7 @@ var defaultOptions = config.AutoscalingOptions{
|
|||
|
||||
// Scale up scenarios.
|
||||
func TestScaleUpOK(t *testing.T) {
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 100, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -77,8 +76,7 @@ func TestScaleUpOK(t *testing.T) {
|
|||
ExtraPods: []PodConfig{
|
||||
{Name: "p-new", Cpu: 500, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
Options: defaultOptions,
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
}
|
||||
expectedResults := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
|
|
@ -92,7 +90,7 @@ func TestScaleUpOK(t *testing.T) {
|
|||
|
||||
// There are triggering, remaining & awaiting pods.
|
||||
func TestMixedScaleUp(t *testing.T) {
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 100, Memory: 1000, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 1000, Memory: 100, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -109,8 +107,7 @@ func TestMixedScaleUp(t *testing.T) {
|
|||
// can only be scheduled on ng1
|
||||
{Name: "awaiting", Cpu: 0, Memory: 200, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
Options: defaultOptions,
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
}
|
||||
expectedResults := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
|
|
@ -127,7 +124,7 @@ func TestMixedScaleUp(t *testing.T) {
|
|||
func TestScaleUpMaxCoresLimitHit(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxCoresTotal = 9
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -140,8 +137,8 @@ func TestScaleUpMaxCoresLimitHit(t *testing.T) {
|
|||
{Name: "p-new-1", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-2", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 2},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 2},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 1},
|
||||
|
|
@ -156,7 +153,7 @@ func TestScaleUpMaxCoresLimitHit(t *testing.T) {
|
|||
func TestScaleUpMaxCoresLimitHitWithNotAutoscaledGroup(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxCoresTotal = 9
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000, Gpu: 0, Ready: true, Group: ""},
|
||||
|
|
@ -169,8 +166,8 @@ func TestScaleUpMaxCoresLimitHitWithNotAutoscaledGroup(t *testing.T) {
|
|||
{Name: "p-new-1", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-2", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 2},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 2},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 1},
|
||||
|
|
@ -185,7 +182,7 @@ func TestScaleUpMaxCoresLimitHitWithNotAutoscaledGroup(t *testing.T) {
|
|||
func TestScaleUpMaxMemoryLimitHit(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxMemoryTotal = 1300 * utils.MiB
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -199,8 +196,8 @@ func TestScaleUpMaxMemoryLimitHit(t *testing.T) {
|
|||
{Name: "p-new-2", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-3", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 3},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 3},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 2},
|
||||
|
|
@ -215,7 +212,7 @@ func TestScaleUpMaxMemoryLimitHit(t *testing.T) {
|
|||
func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxMemoryTotal = 1300 * utils.MiB
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: ""},
|
||||
|
|
@ -229,8 +226,8 @@ func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) {
|
|||
{Name: "p-new-2", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-3", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng1", SizeChange: 3},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng1", SizeChange: 3},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng1", SizeChange: 2},
|
||||
|
|
@ -242,10 +239,140 @@ func TestScaleUpMaxMemoryLimitHitWithNotAutoscaledGroup(t *testing.T) {
|
|||
simpleScaleUpTest(t, config, results)
|
||||
}
|
||||
|
||||
func TestScaleUpTwoGroups(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.BalanceSimilarNodeGroups = true
|
||||
options.ParallelScaleUp = true
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "ng1-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng1"},
|
||||
{Name: "ng2-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng2"},
|
||||
},
|
||||
Pods: []PodConfig{
|
||||
{Name: "p1", Cpu: 1400, Node: "ng1-n1"},
|
||||
{Name: "p2", Cpu: 1400, Node: "ng2-n1"},
|
||||
},
|
||||
ExtraPods: []PodConfig{
|
||||
{Name: "p3", Cpu: 1400},
|
||||
{Name: "p4", Cpu: 1400},
|
||||
},
|
||||
Options: &options,
|
||||
}
|
||||
testCases := []struct {
|
||||
desc string
|
||||
parallel bool
|
||||
}{
|
||||
{
|
||||
desc: "synchronous scale up",
|
||||
parallel: false,
|
||||
},
|
||||
{
|
||||
desc: "parallel scale up",
|
||||
parallel: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
config.Options.ParallelScaleUp = tc.parallel
|
||||
result := runSimpleScaleUpTest(t, config)
|
||||
assert.True(t, result.ScaleUpStatus.WasSuccessful())
|
||||
assert.Nil(t, result.ScaleUpError)
|
||||
assert.Equal(t, result.GroupTargetSizes, map[string]int{
|
||||
"ng1": 2,
|
||||
"ng2": 2,
|
||||
})
|
||||
assert.ElementsMatch(t, result.ScaleUpStatus.PodsTriggeredScaleUp, []string{"p3", "p4"})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCloudProviderFailingToScaleUpGroups(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.BalanceSimilarNodeGroups = true
|
||||
config := &ScaleUpTestConfig{
|
||||
Groups: []NodeGroupConfig{
|
||||
{Name: "ng1", MaxSize: 2},
|
||||
{Name: "ng2", MaxSize: 2},
|
||||
},
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "ng1-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng1"},
|
||||
{Name: "ng2-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng2"},
|
||||
},
|
||||
Pods: []PodConfig{
|
||||
{Name: "p1", Cpu: 1400, Node: "ng1-n1"},
|
||||
{Name: "p2", Cpu: 1400, Node: "ng2-n1"},
|
||||
},
|
||||
ExtraPods: []PodConfig{
|
||||
{Name: "p3", Cpu: 1400},
|
||||
{Name: "p4", Cpu: 1400},
|
||||
},
|
||||
Options: &options,
|
||||
}
|
||||
failAlwaysScaleUp := func(group string, i int) error {
|
||||
return fmt.Errorf("provider error for: %s", group)
|
||||
}
|
||||
failOnceScaleUp := func() testprovider.OnScaleUpFunc {
|
||||
var executed atomic.Bool
|
||||
return func(group string, _ int) error {
|
||||
if !executed.Swap(true) {
|
||||
return fmt.Errorf("provider error for: %s", group)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
testCases := []struct {
|
||||
desc string
|
||||
parallel bool
|
||||
onScaleUp testprovider.OnScaleUpFunc
|
||||
expectConcurrentErrors bool
|
||||
expectedTotalTargetSizes int
|
||||
}{
|
||||
{
|
||||
desc: "synchronous scale up - two failures",
|
||||
parallel: false,
|
||||
onScaleUp: failAlwaysScaleUp,
|
||||
expectConcurrentErrors: false,
|
||||
expectedTotalTargetSizes: 3, // first error stops scale up process
|
||||
},
|
||||
{
|
||||
desc: "parallel scale up - two failures",
|
||||
parallel: true,
|
||||
onScaleUp: failAlwaysScaleUp,
|
||||
expectConcurrentErrors: true,
|
||||
expectedTotalTargetSizes: 4,
|
||||
},
|
||||
{
|
||||
desc: "synchronous scale up - one failure",
|
||||
parallel: false,
|
||||
onScaleUp: failOnceScaleUp(),
|
||||
expectConcurrentErrors: false,
|
||||
expectedTotalTargetSizes: 3,
|
||||
},
|
||||
{
|
||||
desc: "parallel scale up - one failure",
|
||||
parallel: true,
|
||||
onScaleUp: failOnceScaleUp(),
|
||||
expectConcurrentErrors: false,
|
||||
expectedTotalTargetSizes: 4,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
config.Options.ParallelScaleUp = tc.parallel
|
||||
config.OnScaleUp = tc.onScaleUp
|
||||
result := runSimpleScaleUpTest(t, config)
|
||||
assert.False(t, result.ScaleUpStatus.WasSuccessful())
|
||||
assert.Equal(t, errors.CloudProviderError, result.ScaleUpError.Type())
|
||||
assert.Equal(t, tc.expectedTotalTargetSizes, result.GroupTargetSizes["ng1"]+result.GroupTargetSizes["ng2"])
|
||||
assert.Equal(t, tc.expectConcurrentErrors, strings.Contains(result.ScaleUpError.Error(), "...and other concurrent errors"))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestScaleUpCapToMaxTotalNodesLimit(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxNodesTotal = 3
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -259,8 +386,8 @@ func TestScaleUpCapToMaxTotalNodesLimit(t *testing.T) {
|
|||
{Name: "p-new-2", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-3", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 3},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 3},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
|
|
@ -275,7 +402,7 @@ func TestScaleUpCapToMaxTotalNodesLimit(t *testing.T) {
|
|||
func TestScaleUpCapToMaxTotalNodesLimitWithNotAutoscaledGroup(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxNodesTotal = 3
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100 * utils.MiB, Gpu: 0, Ready: true, Group: ""},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -289,8 +416,8 @@ func TestScaleUpCapToMaxTotalNodesLimitWithNotAutoscaledGroup(t *testing.T) {
|
|||
{Name: "p-new-2", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-3", Cpu: 4000, Memory: 100 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "ng2", SizeChange: 3},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "ng2", SizeChange: 3},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "ng2", SizeChange: 1},
|
||||
|
|
@ -305,7 +432,7 @@ func TestScaleUpCapToMaxTotalNodesLimitWithNotAutoscaledGroup(t *testing.T) {
|
|||
func TestWillConsiderGpuAndStandardPoolForPodWhichDoesNotRequireGpu(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxNodesTotal = 100
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "gpu-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Ready: true, Group: "gpu-pool"},
|
||||
{Name: "std-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "std-pool"},
|
||||
|
|
@ -317,8 +444,8 @@ func TestWillConsiderGpuAndStandardPoolForPodWhichDoesNotRequireGpu(t *testing.T
|
|||
ExtraPods: []PodConfig{
|
||||
{Name: "extra-std-pod", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 0, Node: "", ToleratesGpu: true},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "std-pool", SizeChange: 1},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "std-pool", SizeChange: 1},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "std-pool", SizeChange: 1},
|
||||
|
|
@ -337,7 +464,7 @@ func TestWillConsiderGpuAndStandardPoolForPodWhichDoesNotRequireGpu(t *testing.T
|
|||
func TestWillConsiderOnlyGpuPoolForPodWhichDoesRequiresGpu(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxNodesTotal = 100
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "gpu-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Ready: true, Group: "gpu-pool"},
|
||||
{Name: "std-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 0, Ready: true, Group: "std-pool"},
|
||||
|
|
@ -349,8 +476,8 @@ func TestWillConsiderOnlyGpuPoolForPodWhichDoesRequiresGpu(t *testing.T) {
|
|||
ExtraPods: []PodConfig{
|
||||
{Name: "extra-gpu-pod", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Node: "", ToleratesGpu: true},
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "gpu-pool", SizeChange: 1},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "gpu-pool", SizeChange: 1},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "gpu-pool", SizeChange: 1},
|
||||
|
|
@ -368,7 +495,7 @@ func TestWillConsiderOnlyGpuPoolForPodWhichDoesRequiresGpu(t *testing.T) {
|
|||
func TestWillConsiderAllPoolsWhichFitTwoPodsRequiringGpus(t *testing.T) {
|
||||
options := defaultOptions
|
||||
options.MaxNodesTotal = 100
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "gpu-1-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 1, Ready: true, Group: "gpu-1-pool"},
|
||||
{Name: "gpu-2-node-1", Cpu: 2000, Memory: 1000 * utils.MiB, Gpu: 2, Ready: true, Group: "gpu-2-pool"},
|
||||
|
|
@ -386,8 +513,8 @@ func TestWillConsiderAllPoolsWhichFitTwoPodsRequiringGpus(t *testing.T) {
|
|||
{Name: "extra-gpu-pod-2", Cpu: 1, Memory: 1 * utils.MiB, Gpu: 1, Node: "", ToleratesGpu: true}, // CPU and mem negligible
|
||||
{Name: "extra-gpu-pod-3", Cpu: 1, Memory: 1 * utils.MiB, Gpu: 1, Node: "", ToleratesGpu: true}, // CPU and mem negligible
|
||||
},
|
||||
ExpansionOptionToChoose: GroupSizeChange{GroupName: "gpu-1-pool", SizeChange: 3},
|
||||
Options: options,
|
||||
ExpansionOptionToChoose: &GroupSizeChange{GroupName: "gpu-1-pool", SizeChange: 3},
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
FinalOption: GroupSizeChange{GroupName: "gpu-1-pool", SizeChange: 3},
|
||||
|
|
@ -409,7 +536,7 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) {
|
|||
options := defaultOptions
|
||||
options.MaxCoresTotal = 7
|
||||
options.MaxMemoryTotal = 1150
|
||||
config := &ScaleTestConfig{
|
||||
config := &ScaleUpTestConfig{
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "n1", Cpu: 2000, Memory: 100, Gpu: 0, Ready: true, Group: "ng1"},
|
||||
{Name: "n2", Cpu: 4000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng2"},
|
||||
|
|
@ -422,7 +549,7 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) {
|
|||
{Name: "p-new-1", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
{Name: "p-new-2", Cpu: 2000, Memory: 0, Gpu: 0, Node: "", ToleratesGpu: false},
|
||||
},
|
||||
Options: options,
|
||||
Options: &options,
|
||||
}
|
||||
results := &ScaleTestResults{
|
||||
NoScaleUpReason: "max cluster cpu, memory limit reached",
|
||||
|
|
@ -434,152 +561,46 @@ func TestNoScaleUpMaxCoresLimitHit(t *testing.T) {
|
|||
simpleNoScaleUpTest(t, config, results)
|
||||
}
|
||||
|
||||
// To implement expander.Strategy, BestOption method must have a struct receiver.
|
||||
// This prevents it from modifying fields of reportingStrategy, so we need a thin
|
||||
// pointer wrapper for mutable parts.
|
||||
type expanderResults struct {
|
||||
inputOptions []GroupSizeChange
|
||||
}
|
||||
|
||||
type reportingStrategy struct {
|
||||
initialNodeConfigs []NodeConfig
|
||||
optionToChoose GroupSizeChange
|
||||
results *expanderResults
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (r reportingStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
|
||||
r.results.inputOptions = expanderOptionsToGroupSizeChanges(options)
|
||||
for _, option := range options {
|
||||
GroupSizeChange := expanderOptionToGroupSizeChange(option)
|
||||
if GroupSizeChange == r.optionToChoose {
|
||||
return &option
|
||||
}
|
||||
}
|
||||
assert.Fail(r.t, "did not find expansionOptionToChoose %s", r.optionToChoose)
|
||||
return nil
|
||||
}
|
||||
|
||||
func expanderOptionsToGroupSizeChanges(options []expander.Option) []GroupSizeChange {
|
||||
groupSizeChanges := make([]GroupSizeChange, 0, len(options))
|
||||
for _, option := range options {
|
||||
GroupSizeChange := expanderOptionToGroupSizeChange(option)
|
||||
groupSizeChanges = append(groupSizeChanges, GroupSizeChange)
|
||||
}
|
||||
return groupSizeChanges
|
||||
}
|
||||
|
||||
func expanderOptionToGroupSizeChange(option expander.Option) GroupSizeChange {
|
||||
groupName := option.NodeGroup.Id()
|
||||
groupSizeIncrement := option.NodeCount
|
||||
scaleUpOption := GroupSizeChange{GroupName: groupName, SizeChange: groupSizeIncrement}
|
||||
return scaleUpOption
|
||||
}
|
||||
|
||||
func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResults {
|
||||
expandedGroups := make(chan GroupSizeChange, 10)
|
||||
now := time.Now()
|
||||
|
||||
groups := make(map[string][]*apiv1.Node)
|
||||
nodes := make([]*apiv1.Node, 0, len(config.Nodes))
|
||||
for _, n := range config.Nodes {
|
||||
node := BuildTestNode(n.Name, n.Cpu, n.Memory)
|
||||
if n.Gpu > 0 {
|
||||
AddGpusToNode(node, n.Gpu)
|
||||
}
|
||||
SetNodeReadyState(node, n.Ready, now.Add(-2*time.Minute))
|
||||
nodes = append(nodes, node)
|
||||
if n.Group != "" {
|
||||
groups[n.Group] = append(groups[n.Group], node)
|
||||
}
|
||||
}
|
||||
|
||||
pods := make([]*apiv1.Pod, 0, len(config.Pods))
|
||||
for _, p := range config.Pods {
|
||||
pod := buildTestPod(p)
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
|
||||
podLister := kube_util.NewTestPodLister(pods)
|
||||
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
|
||||
expandedGroups <- GroupSizeChange{GroupName: nodeGroup, SizeChange: increase}
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
for name, nodesInGroup := range groups {
|
||||
provider.AddNodeGroup(name, 1, 10, len(nodesInGroup))
|
||||
for _, n := range nodesInGroup {
|
||||
provider.AddNode(name, n)
|
||||
}
|
||||
}
|
||||
|
||||
resourceLimiter := cloudprovider.NewResourceLimiter(
|
||||
map[string]int64{cloudprovider.ResourceNameCores: config.Options.MinCoresTotal, cloudprovider.ResourceNameMemory: config.Options.MinMemoryTotal},
|
||||
map[string]int64{cloudprovider.ResourceNameCores: config.Options.MaxCoresTotal, cloudprovider.ResourceNameMemory: config.Options.MaxMemoryTotal})
|
||||
provider.SetResourceLimiter(resourceLimiter)
|
||||
|
||||
assert.NotNil(t, provider)
|
||||
|
||||
// Create context with non-random expander strategy.
|
||||
context, err := NewScaleTestAutoscalingContext(config.Options, &fake.Clientset{}, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
expander := reportingStrategy{
|
||||
initialNodeConfigs: config.Nodes,
|
||||
optionToChoose: config.ExpansionOptionToChoose,
|
||||
results: &expanderResults{},
|
||||
t: t,
|
||||
}
|
||||
context.ExpanderStrategy = expander
|
||||
|
||||
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
|
||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute))
|
||||
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
|
||||
|
||||
extraPods := make([]*apiv1.Pod, 0, len(config.ExtraPods))
|
||||
for _, p := range config.ExtraPods {
|
||||
pod := buildTestPod(p)
|
||||
extraPods = append(extraPods, pod)
|
||||
}
|
||||
|
||||
processors := NewTestProcessors(&context)
|
||||
suOrchestrator := New()
|
||||
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
|
||||
scaleUpStatus, err := suOrchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
|
||||
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
expandedGroup := getGroupSizeChangeFromChan(expandedGroups)
|
||||
var expandedGroupStruct GroupSizeChange
|
||||
if expandedGroup != nil {
|
||||
expandedGroupStruct = *expandedGroup
|
||||
}
|
||||
|
||||
events := []string{}
|
||||
for eventsLeft := true; eventsLeft; {
|
||||
select {
|
||||
case event := <-context.Recorder.(*kube_record.FakeRecorder).Events:
|
||||
events = append(events, event)
|
||||
default:
|
||||
eventsLeft = false
|
||||
}
|
||||
}
|
||||
|
||||
return &ScaleTestResults{
|
||||
ExpansionOptions: expander.results.inputOptions,
|
||||
FinalOption: expandedGroupStruct,
|
||||
ScaleUpStatus: simplifyScaleUpStatus(scaleUpStatus),
|
||||
Events: events,
|
||||
}
|
||||
}
|
||||
|
||||
func simpleNoScaleUpTest(t *testing.T, config *ScaleTestConfig, expectedResults *ScaleTestResults) {
|
||||
func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) {
|
||||
results := runSimpleScaleUpTest(t, config)
|
||||
assert.NotNil(t, results.GroupSizeChanges[0], "Expected scale up event")
|
||||
assert.Equal(t, expectedResults.FinalOption, results.GroupSizeChanges[0])
|
||||
assert.True(t, results.ScaleUpStatus.WasSuccessful())
|
||||
nodeEventSeen := false
|
||||
for _, event := range results.Events {
|
||||
if strings.Contains(event, "TriggeredScaleUp") && strings.Contains(event, expectedResults.FinalOption.GroupName) {
|
||||
nodeEventSeen = true
|
||||
}
|
||||
if len(expectedResults.ScaleUpStatus.PodsRemainUnschedulable) == 0 {
|
||||
assert.NotRegexp(t, regexp.MustCompile("NotTriggerScaleUp"), event)
|
||||
}
|
||||
}
|
||||
assert.True(t, nodeEventSeen)
|
||||
if len(expectedResults.ExpansionOptions) > 0 {
|
||||
// Empty ExpansionOptions means we do not want to do any assertions
|
||||
// on contents of actual scaleUp options
|
||||
if config.ExpansionOptionToChoose != nil {
|
||||
// Check that option to choose is part of expected options.
|
||||
assert.Contains(t, expectedResults.ExpansionOptions, *config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options")
|
||||
assert.Contains(t, results.ExpansionOptions, *config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options")
|
||||
}
|
||||
assert.ElementsMatch(t, results.ExpansionOptions, expectedResults.ExpansionOptions,
|
||||
"actual and expected expansion options should be the same")
|
||||
}
|
||||
if expectedResults.GroupTargetSizes != nil {
|
||||
assert.Equal(t, expectedResults.GroupTargetSizes, results.GroupTargetSizes)
|
||||
}
|
||||
assert.ElementsMatch(t, results.ScaleUpStatus.PodsTriggeredScaleUp, expectedResults.ScaleUpStatus.PodsTriggeredScaleUp,
|
||||
"actual and expected triggering pods should be the same")
|
||||
assert.ElementsMatch(t, results.ScaleUpStatus.PodsRemainUnschedulable, expectedResults.ScaleUpStatus.PodsRemainUnschedulable,
|
||||
"actual and expected remaining pods should be the same")
|
||||
assert.ElementsMatch(t, results.ScaleUpStatus.PodsAwaitEvaluation, expectedResults.ScaleUpStatus.PodsAwaitEvaluation,
|
||||
"actual and expected awaiting evaluation pods should be the same")
|
||||
}
|
||||
|
||||
assert.Equal(t, GroupSizeChange{}, results.FinalOption)
|
||||
func simpleNoScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) {
|
||||
results := runSimpleScaleUpTest(t, config)
|
||||
assert.Nil(t, results.GroupSizeChanges)
|
||||
assert.False(t, results.ScaleUpStatus.WasSuccessful())
|
||||
noScaleUpEventSeen := false
|
||||
for _, event := range results.Events {
|
||||
|
|
@ -602,49 +623,119 @@ func simpleNoScaleUpTest(t *testing.T, config *ScaleTestConfig, expectedResults
|
|||
"actual and expected awaiting evaluation pods should be the same")
|
||||
}
|
||||
|
||||
func simpleScaleUpTest(t *testing.T, config *ScaleTestConfig, expectedResults *ScaleTestResults) {
|
||||
results := runSimpleScaleUpTest(t, config)
|
||||
func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestResult {
|
||||
now := time.Now()
|
||||
groupSizeChangesChannel := make(chan GroupSizeChange, 20)
|
||||
groupNodes := make(map[string][]*apiv1.Node)
|
||||
|
||||
assert.NotNil(t, results.FinalOption, "Expected scale up event")
|
||||
assert.Equal(t, expectedResults.FinalOption, results.FinalOption)
|
||||
assert.True(t, results.ScaleUpStatus.WasSuccessful())
|
||||
nodeEventSeen := false
|
||||
for _, event := range results.Events {
|
||||
if strings.Contains(event, "TriggeredScaleUp") && strings.Contains(event, expectedResults.FinalOption.GroupName) {
|
||||
nodeEventSeen = true
|
||||
// build nodes
|
||||
nodes := make([]*apiv1.Node, 0, len(config.Nodes))
|
||||
for _, n := range config.Nodes {
|
||||
node := BuildTestNode(n.Name, n.Cpu, n.Memory)
|
||||
if n.Gpu > 0 {
|
||||
AddGpusToNode(node, n.Gpu)
|
||||
}
|
||||
if len(expectedResults.ScaleUpStatus.PodsRemainUnschedulable) == 0 {
|
||||
assert.NotRegexp(t, regexp.MustCompile("NotTriggerScaleUp"), event)
|
||||
SetNodeReadyState(node, n.Ready, now.Add(-2*time.Minute))
|
||||
nodes = append(nodes, node)
|
||||
if n.Group != "" {
|
||||
groupNodes[n.Group] = append(groupNodes[n.Group], node)
|
||||
}
|
||||
}
|
||||
assert.True(t, nodeEventSeen)
|
||||
|
||||
if len(expectedResults.ExpansionOptions) > 0 {
|
||||
// Empty ExpansionOptions means we do not want to do any assertions
|
||||
// on contents of actual scaleUp options
|
||||
|
||||
// Check that option to choose is part of expected options.
|
||||
assert.Contains(t, expectedResults.ExpansionOptions, config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options")
|
||||
assert.Contains(t, results.ExpansionOptions, config.ExpansionOptionToChoose, "final expected expansion option must be in expected expansion options")
|
||||
|
||||
assert.ElementsMatch(t, results.ExpansionOptions, expectedResults.ExpansionOptions,
|
||||
"actual and expected expansion options should be the same")
|
||||
// build and setup pods
|
||||
pods := make([]*apiv1.Pod, len(config.Pods))
|
||||
for i, p := range config.Pods {
|
||||
pods[i] = buildTestPod(p)
|
||||
}
|
||||
extraPods := make([]*apiv1.Pod, len(config.ExtraPods))
|
||||
for i, p := range config.ExtraPods {
|
||||
extraPods[i] = buildTestPod(p)
|
||||
}
|
||||
podLister := kube_util.NewTestPodLister(pods)
|
||||
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
assert.ElementsMatch(t, results.ScaleUpStatus.PodsTriggeredScaleUp, expectedResults.ScaleUpStatus.PodsTriggeredScaleUp,
|
||||
"actual and expected triggering pods should be the same")
|
||||
assert.ElementsMatch(t, results.ScaleUpStatus.PodsRemainUnschedulable, expectedResults.ScaleUpStatus.PodsRemainUnschedulable,
|
||||
"actual and expected remaining pods should be the same")
|
||||
assert.ElementsMatch(t, results.ScaleUpStatus.PodsAwaitEvaluation, expectedResults.ScaleUpStatus.PodsAwaitEvaluation,
|
||||
"actual and expected awaiting evaluation pods should be the same")
|
||||
}
|
||||
|
||||
func getGroupSizeChangeFromChan(c chan GroupSizeChange) *GroupSizeChange {
|
||||
select {
|
||||
case val := <-c:
|
||||
return &val
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// setup node groups
|
||||
provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
|
||||
groupSizeChangesChannel <- GroupSizeChange{GroupName: nodeGroup, SizeChange: increase}
|
||||
if config.OnScaleUp != nil {
|
||||
return config.OnScaleUp(nodeGroup, increase)
|
||||
}
|
||||
return nil
|
||||
}, nil)
|
||||
options := defaultOptions
|
||||
if config.Options != nil {
|
||||
options = *config.Options
|
||||
}
|
||||
resourceLimiter := cloudprovider.NewResourceLimiter(
|
||||
map[string]int64{cloudprovider.ResourceNameCores: options.MinCoresTotal, cloudprovider.ResourceNameMemory: options.MinMemoryTotal},
|
||||
map[string]int64{cloudprovider.ResourceNameCores: options.MaxCoresTotal, cloudprovider.ResourceNameMemory: options.MaxMemoryTotal})
|
||||
provider.SetResourceLimiter(resourceLimiter)
|
||||
groupConfigs := make(map[string]*NodeGroupConfig)
|
||||
for _, group := range config.Groups {
|
||||
groupConfigs[group.Name] = &group
|
||||
}
|
||||
for name, nodesInGroup := range groupNodes {
|
||||
groupConfig := groupConfigs[name]
|
||||
if groupConfig == nil {
|
||||
groupConfig = &NodeGroupConfig{
|
||||
Name: name,
|
||||
MinSize: 1,
|
||||
MaxSize: 10,
|
||||
}
|
||||
}
|
||||
provider.AddNodeGroup(name, groupConfig.MinSize, groupConfig.MaxSize, len(nodesInGroup))
|
||||
for _, n := range nodesInGroup {
|
||||
provider.AddNode(name, n)
|
||||
}
|
||||
}
|
||||
|
||||
// build orchestrator
|
||||
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
|
||||
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
|
||||
assert.NoError(t, err)
|
||||
clusterState := clusterstate.
|
||||
NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute))
|
||||
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
|
||||
processors := NewTestProcessors(&context)
|
||||
orchestrator := New()
|
||||
orchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
|
||||
expander := NewMockRepotingStrategy(t, config.ExpansionOptionToChoose)
|
||||
context.ExpanderStrategy = expander
|
||||
|
||||
// scale up
|
||||
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
|
||||
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
|
||||
|
||||
// aggregate group size changes
|
||||
close(groupSizeChangesChannel)
|
||||
var groupSizeChanges []GroupSizeChange
|
||||
for change := range groupSizeChangesChannel {
|
||||
groupSizeChanges = append(groupSizeChanges, change)
|
||||
}
|
||||
|
||||
// aggregate events
|
||||
eventsChannel := context.Recorder.(*kube_record.FakeRecorder).Events
|
||||
close(eventsChannel)
|
||||
events := []string{}
|
||||
for event := range eventsChannel {
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
// build target sizes
|
||||
targetSizes := make(map[string]int)
|
||||
for _, group := range provider.NodeGroups() {
|
||||
targetSizes[group.Id()], _ = group.TargetSize()
|
||||
}
|
||||
|
||||
return &ScaleUpTestResult{
|
||||
ScaleUpError: scaleUpErr,
|
||||
ScaleUpStatus: simplifyScaleUpStatus(scaleUpStatus),
|
||||
GroupSizeChanges: groupSizeChanges,
|
||||
Events: events,
|
||||
GroupTargetSizes: targetSizes,
|
||||
ExpansionOptions: expander.LastInputOptions(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1049,24 +1140,33 @@ func TestCheckDeltaWithinLimits(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAuthError(t *testing.T) {
|
||||
func TestAuthErrorHandling(t *testing.T) {
|
||||
metrics.RegisterAll(false)
|
||||
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nodeGroup := &mockprovider.NodeGroup{}
|
||||
info := nodegroupset.ScaleUpInfo{Group: nodeGroup}
|
||||
nodeGroup.On("Id").Return("A")
|
||||
nodeGroup.On("IncreaseSize", 0).Return(errors.NewAutoscalerError(errors.AutoscalerErrorType("abcd"), ""))
|
||||
|
||||
processors := NewTestProcessors(&context)
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(nil, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute))
|
||||
suOrchestrator := New()
|
||||
suOrchestrator.Initialize(&context, processors, clusterStateRegistry, taints.TaintConfig{})
|
||||
scaleUpOrchestrator := suOrchestrator.(*ScaleUpOrchestrator)
|
||||
aerr := scaleUpOrchestrator.executeScaleUp(info, "", "", time.Now())
|
||||
assert.Error(t, aerr)
|
||||
config := &ScaleUpTestConfig{
|
||||
Groups: []NodeGroupConfig{
|
||||
{Name: "ng1", MaxSize: 2},
|
||||
},
|
||||
Nodes: []NodeConfig{
|
||||
{Name: "ng1-n1", Cpu: 1500, Memory: 1000 * utils.MiB, Ready: true, Group: "ng1"},
|
||||
},
|
||||
ExtraPods: []PodConfig{
|
||||
{Name: "p1", Cpu: 1000},
|
||||
},
|
||||
OnScaleUp: func(group string, i int) error {
|
||||
return errors.NewAutoscalerError(errors.AutoscalerErrorType("authError"), "auth error")
|
||||
},
|
||||
Options: &defaultOptions,
|
||||
}
|
||||
results := runSimpleScaleUpTest(t, config)
|
||||
expected := errors.NewAutoscalerError(
|
||||
errors.AutoscalerErrorType("authError"),
|
||||
"failed to increase node group size: auth error",
|
||||
)
|
||||
assert.Equal(t, expected, results.ScaleUpError)
|
||||
assertLegacyRegistryEntry(t, "cluster_autoscaler_failed_scale_ups_total{reason=\"authError\"} 1")
|
||||
}
|
||||
|
||||
func assertLegacyRegistryEntry(t *testing.T, entry string) {
|
||||
req, err := http.NewRequest("GET", "/", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
@ -1074,14 +1174,13 @@ func TestAuthError(t *testing.T) {
|
|||
rr := httptest.NewRecorder()
|
||||
handler := http.HandlerFunc(legacyregistry.Handler().ServeHTTP)
|
||||
handler.ServeHTTP(rr, req)
|
||||
|
||||
// Check that the status code is what we expect.
|
||||
if status := rr.Code; status != http.StatusOK {
|
||||
t.Errorf("handler returned wrong status code: got %v want %v",
|
||||
status, http.StatusOK)
|
||||
}
|
||||
// Check that the failed scale up reason is set correctly.
|
||||
assert.Contains(t, rr.Body.String(), "cluster_autoscaler_failed_scale_ups_total{reason=\"abcd\"} 1")
|
||||
assert.Contains(t, rr.Body.String(), entry)
|
||||
}
|
||||
|
||||
func simplifyScaleUpStatus(scaleUpStatus *status.ScaleUpStatus) ScaleUpStatusInfo {
|
||||
|
|
|
|||
|
|
@ -22,10 +22,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
testcloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||
|
|
@ -33,7 +29,10 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors"
|
||||
|
|
@ -50,14 +49,14 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
kube_client "k8s.io/client-go/kubernetes"
|
||||
kube_record "k8s.io/client-go/tools/record"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
|
@ -102,9 +101,38 @@ type ScaleTestConfig struct {
|
|||
ExpectedScaleDownCount int
|
||||
}
|
||||
|
||||
// NodeGroupConfig is a node group config used in tests
|
||||
type NodeGroupConfig struct {
|
||||
Name string
|
||||
MinSize int
|
||||
MaxSize int
|
||||
}
|
||||
|
||||
// ScaleUpTestConfig represents a config of a scale test
|
||||
type ScaleUpTestConfig struct {
|
||||
Groups []NodeGroupConfig
|
||||
Nodes []NodeConfig
|
||||
Pods []PodConfig
|
||||
ExtraPods []PodConfig
|
||||
OnScaleUp testcloudprovider.OnScaleUpFunc
|
||||
ExpansionOptionToChoose *GroupSizeChange
|
||||
Options *config.AutoscalingOptions
|
||||
}
|
||||
|
||||
// ScaleUpTestResult represents a node groups scale up result
|
||||
type ScaleUpTestResult struct {
|
||||
ScaleUpError errors.AutoscalerError
|
||||
ScaleUpStatus ScaleUpStatusInfo
|
||||
GroupSizeChanges []GroupSizeChange
|
||||
ExpansionOptions []GroupSizeChange
|
||||
Events []string
|
||||
GroupTargetSizes map[string]int
|
||||
}
|
||||
|
||||
// ScaleTestResults contains results of a scale test
|
||||
type ScaleTestResults struct {
|
||||
ExpansionOptions []GroupSizeChange
|
||||
GroupTargetSizes map[string]int
|
||||
FinalOption GroupSizeChange
|
||||
NoScaleUpReason string
|
||||
FinalScaleDowns []string
|
||||
|
|
@ -159,7 +187,8 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
|
|||
func NewScaleTestAutoscalingContext(
|
||||
options config.AutoscalingOptions, fakeClient kube_client.Interface,
|
||||
listers kube_util.ListerRegistry, provider cloudprovider.CloudProvider,
|
||||
processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (context.AutoscalingContext, error) {
|
||||
processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter,
|
||||
) (context.AutoscalingContext, error) {
|
||||
// Not enough buffer space causes the test to hang without printing any logs.
|
||||
// This is not useful.
|
||||
fakeRecorder := kube_record.NewFakeRecorder(100)
|
||||
|
|
@ -279,8 +308,8 @@ type MockAutoprovisioningNodeGroupListProcessor struct {
|
|||
|
||||
// Process extends the list of node groups
|
||||
func (p *MockAutoprovisioningNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulerframework.NodeInfo, error) {
|
||||
|
||||
unschedulablePods []*apiv1.Pod,
|
||||
) ([]cloudprovider.NodeGroup, map[string]*schedulerframework.NodeInfo, error) {
|
||||
machines, err := context.CloudProvider.GetAvailableMachineTypes()
|
||||
assert.NoError(p.T, err)
|
||||
|
||||
|
|
@ -305,3 +334,67 @@ func NewBackoff() backoff.Backoff {
|
|||
return backoff.NewIdBasedExponentialBackoff(5*time.Minute, /*InitialNodeGroupBackoffDuration*/
|
||||
30*time.Minute /*MaxNodeGroupBackoffDuration*/, 3*time.Hour /*NodeGroupBackoffResetTimeout*/)
|
||||
}
|
||||
|
||||
// To implement expander.Strategy, BestOption method must have a struct receiver.
|
||||
// This prevents it from modifying fields of reportingStrategy, so we need a thin
|
||||
// pointer wrapper for mutable parts.
|
||||
type expanderResults struct {
|
||||
inputOptions []GroupSizeChange
|
||||
}
|
||||
|
||||
// MockReportingStrategy implements expander.Strategy
|
||||
type MockReportingStrategy struct {
|
||||
defaultStrategy expander.Strategy
|
||||
optionToChoose *GroupSizeChange
|
||||
t *testing.T
|
||||
results *expanderResults
|
||||
}
|
||||
|
||||
// NewMockRepotingStrategy creates an expander strategy with reporting and mocking capabilities.
|
||||
func NewMockRepotingStrategy(t *testing.T, optionToChoose *GroupSizeChange) *MockReportingStrategy {
|
||||
return &MockReportingStrategy{
|
||||
defaultStrategy: random.NewStrategy(),
|
||||
results: &expanderResults{},
|
||||
optionToChoose: optionToChoose,
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
// LastInputOptions provides access to expansion options passed as an input in recent strategy execution
|
||||
func (r *MockReportingStrategy) LastInputOptions() []GroupSizeChange {
|
||||
return r.results.inputOptions
|
||||
}
|
||||
|
||||
// BestOption satisfies the Strategy interface. Picks the best option from those passed as an argument.
|
||||
// When parameter optionToChoose is defined, it's picked as the best one.
|
||||
// Otherwise, random option is used.
|
||||
func (r *MockReportingStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
|
||||
r.results.inputOptions = expanderOptionsToGroupSizeChanges(options)
|
||||
if r.optionToChoose == nil {
|
||||
return r.defaultStrategy.BestOption(options, nodeInfo)
|
||||
}
|
||||
for _, option := range options {
|
||||
groupSizeChange := expanderOptionToGroupSizeChange(option)
|
||||
if groupSizeChange == *r.optionToChoose {
|
||||
return &option
|
||||
}
|
||||
}
|
||||
assert.Fail(r.t, "did not find expansionOptionToChoose %+v", r.optionToChoose)
|
||||
return nil
|
||||
}
|
||||
|
||||
func expanderOptionsToGroupSizeChanges(options []expander.Option) []GroupSizeChange {
|
||||
groupSizeChanges := make([]GroupSizeChange, 0, len(options))
|
||||
for _, option := range options {
|
||||
groupSizeChange := expanderOptionToGroupSizeChange(option)
|
||||
groupSizeChanges = append(groupSizeChanges, groupSizeChange)
|
||||
}
|
||||
return groupSizeChanges
|
||||
}
|
||||
|
||||
func expanderOptionToGroupSizeChange(option expander.Option) GroupSizeChange {
|
||||
groupName := option.NodeGroup.Id()
|
||||
groupSizeIncrement := option.NodeCount
|
||||
scaleUpOption := GroupSizeChange{GroupName: groupName, SizeChange: groupSizeIncrement}
|
||||
return scaleUpOption
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,7 +147,8 @@ var (
|
|||
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.")
|
||||
maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations")
|
||||
okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage")
|
||||
scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there 0 ready nodes.")
|
||||
scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there are 0 ready nodes.")
|
||||
parallelScaleUp = flag.Bool("parallel-scale-up", false, "Whether to allow parallel node groups scale up. Experimental: may not work on some cloud providers, enable at your own risk.")
|
||||
maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "The default maximum time CA waits for node to be provisioned - the value can be overridden per node group")
|
||||
maxPodEvictionTime = flag.Duration("max-pod-eviction-time", 2*time.Minute, "Maximum time CA tries to evict a pod before giving up")
|
||||
nodeGroupsFlag = multiStringFlag(
|
||||
|
|
@ -265,6 +266,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
|
|||
MaxTotalUnreadyPercentage: *maxTotalUnreadyPercentage,
|
||||
OkTotalUnreadyCount: *okTotalUnreadyCount,
|
||||
ScaleUpFromZero: *scaleUpFromZero,
|
||||
ParallelScaleUp: *parallelScaleUp,
|
||||
EstimatorName: *estimatorFlag,
|
||||
ExpanderNames: *expanderFlag,
|
||||
GRPCExpanderCert: *grpcExpanderCert,
|
||||
|
|
|
|||
Loading…
Reference in New Issue