feat: support `--scale-down-delay-after-*` per nodegroup

Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: update scale down status after every scale up
- move scaledown delay status to cluster state/registry
- enable scale down if  `ScaleDownDelayTypeLocal` is enabled
- add new funcs on cluster state to get and update scale down delay status
- use timestamp instead of booleans to track scale down delay status
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: use existing fields on clusterstate
- uses `scaleUpRequests`, `scaleDownRequests` and `scaleUpFailures` instead of `ScaleUpDelayStatus`
- changed the above existing fields a little to make them more convenient for use
- moved initializing scale down delay processor to static autoscaler (because clusterstate is not available in main.go)
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: remove note saying only `scale-down-after-add` is supported
- because we are supporting all the flags
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

fix: evaluate `scaleDownInCooldown` the old way only if `ScaleDownDelayTypeLocal` is set to `false`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: remove line saying `--scale-down-delay-type-local` is only supported for `--scale-down-delay-after-add`
- because it is not true anymore
- we are supporting all `--scale-down-delay-after-*` flags per nodegroup
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: fix clusterstate tests failing
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: move back initializing processors logic to from static autoscaler to main
- we don't want to initialize processors in static autoscaler because anyone implementing an alternative to static_autoscaler has to initialize the processors
- and initializing specific processors is making static autoscaler aware of an implementation detail which might not be the best practice
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: revert changes related to `clusterstate`
- since I am going with observer pattern
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: add observer interface for state of scaling
- to implement observer pattern for tracking state of scale up/downs (as opposed to using clusterstate to do the same)
- refactor `ScaleDownCandidatesDelayProcessor` to use fields from the new observer
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: remove params passed to `clearScaleUpFailures`
- not needed anymore
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: revert clusterstate tests
- approach has changed
- I am not making any changes in clusterstate now
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: add accidentally deleted lines for clusterstate test
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: implement `Add` fn for scale state observer
- to easily add new observers
- re-word comments
- remove redundant params from `NewDefaultScaleDownCandidatesProcessor`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

fix: CI complaining because no comments on fn definitions
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: initialize parent `ScaleDownCandidatesProcessor`
- instead  of `ScaleDownCandidatesSortingProcessor` and `ScaleDownCandidatesDelayProcessor` separately
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: add scale state notifier to list of default processors
- initialize processors for `NewDefaultScaleDownCandidatesProcessor` outside and pass them to the fn
- this allows more flexibility
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: add observer interface
- create a separate observer directory
- implement `RegisterScaleUp` function in the clusterstate
- TODO: resolve syntax errors
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: use `scaleStateNotifier` in place of `clusterstate`
- delete leftover `scale_stateA_observer.go` (new one is already present in `observers` directory)
- register `clustertstate` with `scaleStateNotifier`
- use `Register` instead of `Add` function in `scaleStateNotifier`
- fix `go build`
- wip: fixing tests
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: fix syntax errors
- add utils package `pointers` for converting `time` to pointer (without having to initialize a new variable)
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: wip track scale down failures along with scale up failures
- I was tracking scale up failures but not scale down failures
- fix copyright year 2017 -> 2023 for the new `pointers` package
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: register failed scale down with scale state notifier
- wip writing tests for `scale_down_candidates_delay_processor`
- fix CI lint errors
- remove test file for `scale_down_candidates_processor` (there is not much to test as of now)
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: wip tests for `ScaleDownCandidatesDelayProcessor`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: add unit tests for `ScaleDownCandidatesDelayProcessor`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: don't track scale up failures in `ScaleDownCandidatesDelayProcessor`
- not needed
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: better doc comments for `TestGetScaleDownCandidates`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: don't ignore error in `NGChangeObserver`
- return it instead and let the caller decide what to do with it
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: change pointers to values in `NGChangeObserver` interface
- easier to work with
- remove `expectedAddTime` param from `RegisterScaleUp` (not needed for now)
- add tests for clusterstate's `RegisterScaleUp`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: conditions in `GetScaleDownCandidates`
- set scale down in cool down if the number of scale down candidates is 0
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: use `ng1` instead of `ng2` in existing test
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

feat: wip static autoscaler tests
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: assign directly instead of using `sdProcessor` variable
- variable is not needed
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: first working test for static autoscaler
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: continue working on static autoscaler tests
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: wip second static autoscaler test
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: remove `Println` used for debugging
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: add static_autoscaler tests for scale down delay per nodegroup flags
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

chore: rebase off the latest `master`
- change scale state observer interface's `RegisterFailedScaleup` to reflect latest changes around clusterstate's `RegisterFailedScaleup` in `master`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: fix clusterstate test failing
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: fix failing orchestrator test
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: rename `defaultScaleDownCandidatesProcessor` -> `combinedScaleDownCandidatesProcessor`
- describes the processor better
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: replace `NGChangeObserver` -> `NodeGroupChangeObserver`
- makes it easier to understand for someone not familiar with the codebase
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

docs: reword code comment `after` -> `for which`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: don't return error from `RegisterScaleDown`
- not needed as of now (no implementer function returns a non-nil error for this function)
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: address review comments around ng change observer interface
- change dir structure of nodegroup change observer package
- stop returning errors wherever it is not needed in the nodegroup change observer interface
- rename `NGChangeObserver` -> `NodeGroupChangeObserver` interface (makes it easier to understand)
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: make nodegroupchange observer thread-safe
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

docs: add TODO to consider using multiple mutexes in nodegroupchange observer
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: use `time.Now()` directly instead of assigning a variable to it
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: share code for checking if there was a recent scale-up/down/failure
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: convert `ScaleDownCandidatesDelayProcessor` into table tests
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: change scale state notifier's `Register()` -> `RegisterForNotifications()`
- makes it easier to understand what the function does
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

test: replace scale state notifier `Register` -> `RegisterForNotifications` in test
- to fix syntax errors since it is already renamed in the actual code
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: remove `clusterStateRegistry` from `delete_in_batch` tests
- not needed anymore since we have `scaleStateNotifier`
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

refactor: address PR review comments
Signed-off-by: vadasambar <surajrbanakar@gmail.com>

fix: add empty `RegisterFailedScaleDown` for clusterstate
- fix syntax error in static autoscaler test
Signed-off-by: vadasambar <surajrbanakar@gmail.com>
This commit is contained in:
vadasambar 2023-05-05 10:05:52 +05:30
parent cdb24d8371
commit 5de49a11fb
20 changed files with 798 additions and 90 deletions

View File

@ -188,13 +188,8 @@ func (csr *ClusterStateRegistry) Stop() {
close(csr.interrupt)
}
// RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase
// count.
// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime
// are reset.
// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are
// left intact.
func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
// RegisterScaleUp registers scale-up for give node group
func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
@ -246,7 +241,14 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr
}
// RegisterScaleDown registers node scale down.
func (csr *ClusterStateRegistry) RegisterScaleDown(request *ScaleDownRequest) {
func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup,
nodeName string, currentTime time.Time, expectedDeleteTime time.Time) {
request := &ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: nodeName,
Time: currentTime,
ExpectedDeleteTime: expectedDeleteTime,
}
csr.Lock()
defer csr.Unlock()
csr.scaleDownRequests = append(csr.scaleDownRequests, request)
@ -310,16 +312,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
// RegisterFailedScaleUp should be called after getting error from cloudprovider
// when trying to scale-up node group. It will mark this group as not safe to autoscale
// for some time.
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerFailedScaleUpNoLock(nodeGroup, reason, cloudprovider.InstanceErrorInfo{
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: string(reason),
ErrorMessage: errorMessage,
}, gpuResourceName, gpuType, currentTime)
}
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
// We don't need to implement this function for cluster state registry
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
}
func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)

View File

@ -31,6 +31,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
@ -75,7 +76,7 @@ func TestOKWithScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
@ -122,7 +123,7 @@ func TestEmptyOK(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))
provider.AddNodeGroup("ng1", 0, 10, 3)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
// clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second)
// clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
@ -161,7 +162,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))
provider.AddNodeGroup("ng1", 0, 5, tc.initialSize+tc.delta)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1"))
@ -450,7 +451,7 @@ func TestExpiredScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
@ -476,13 +477,7 @@ func TestRegisterScaleDown(t *testing.T) {
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
now := time.Now()
clusterstate.RegisterScaleDown(&ScaleDownRequest{
NodeGroup: provider.GetNodeGroup("ng1"),
NodeName: "ng1-1",
ExpectedDeleteTime: now.Add(time.Minute),
Time: now,
})
clusterstate.RegisterScaleDown(provider.GetNodeGroup("ng1"), "ng1-1", now.Add(time.Minute), now)
assert.Equal(t, 1, len(clusterstate.scaleDownRequests))
clusterstate.updateScaleRequests(now.Add(5 * time.Minute))
assert.Equal(t, 0, len(clusterstate.scaleDownRequests))
@ -794,7 +789,7 @@ func TestScaleUpBackoff(t *testing.T) {
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 120 * time.Second}))
// After failed scale-up, node group should be still healthy, but should backoff from scale-ups
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
@ -826,7 +821,7 @@ func TestScaleUpBackoff(t *testing.T) {
assert.Equal(t, NodeGroupScalingSafety{SafeToScale: true, Healthy: true}, clusterstate.NodeGroupScaleUpSafety(ng1, now))
// Another failed scale up should cause longer backoff
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
@ -860,7 +855,7 @@ func TestScaleUpBackoff(t *testing.T) {
}, clusterstate.NodeGroupScaleUpSafety(ng1, now))
// The backoff should be cleared after a successful scale-up
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now)
ng1_4 := BuildTestNode("ng1-4", 1000, 1000)
SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute))
provider.AddNode("ng1", ng1_4)
@ -935,6 +930,7 @@ func TestUpdateScaleUp(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 5)
provider.AddNodeGroup("ng2", 1, 10, 5)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(
@ -948,29 +944,30 @@ func TestUpdateScaleUp(t *testing.T) {
nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}),
)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now)
// Test cases for `RegisterScaleUp`
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))
// expect no change of times on negative delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))
// update times on positive delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))
// if we get below 0 scalup is deleted
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])
// If new scalup is registered with negative delta nothing should happen
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])
}
@ -986,9 +983,9 @@ func TestScaleUpFailures(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.Timeout, "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), metrics.Timeout, "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.APIError, "", "", "", now.Add(time.Minute))
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.Timeout), "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), string(metrics.Timeout), "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.APIError), "", "", "", now.Add(time.Minute))
failures := clusterstate.GetScaleUpFailures()
assert.Equal(t, map[string][]ScaleUpFailure{

View File

@ -158,6 +158,9 @@ type AutoscalingOptions struct {
ScaleDownDelayAfterDelete time.Duration
// ScaleDownDelayAfterFailure sets the duration before the next scale down attempt if scale down results in an error
ScaleDownDelayAfterFailure time.Duration
// ScaleDownDelayTypeLocal sets if the --scale-down-delay-after-* flags should be applied locally per nodegroup
// or globally across all nodegroups
ScaleDownDelayTypeLocal bool
// ScaleDownNonEmptyCandidatesCount is the maximum number of non empty nodes
// considered at once as candidates for scale down.
ScaleDownNonEmptyCandidatesCount int

View File

@ -40,7 +40,8 @@ const (
DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime"
// DefaultIgnoreDaemonSetsUtilizationKey identifies IgnoreDaemonSetsUtilization autoscaling option
DefaultIgnoreDaemonSetsUtilizationKey = "ignoredaemonsetsutilization"
// DefaultScaleDownUnneededTime identifies ScaleDownUnneededTime autoscaling option
// DefaultScaleDownUnneededTime is the default time duration for which CA waits before deleting an unneeded node
DefaultScaleDownUnneededTime = 10 * time.Minute
// DefaultScaleDownUnreadyTime identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTime = 20 * time.Minute
@ -48,4 +49,8 @@ const (
DefaultScaleDownUtilizationThreshold = 0.5
// DefaultScaleDownGpuUtilizationThreshold identifies ScaleDownGpuUtilizationThreshold autoscaling option
DefaultScaleDownGpuUtilizationThreshold = 0.5
// DefaultScaleDownDelayAfterFailure is the default value for ScaleDownDelayAfterFailure autoscaling option
DefaultScaleDownDelayAfterFailure = 3 * time.Minute
// DefaultScanInterval is the default scan interval for CA
DefaultScanInterval = 10 * time.Second
)

View File

@ -22,7 +22,6 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
@ -31,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
@ -45,7 +45,6 @@ import (
// Actuator is responsible for draining and deleting nodes.
type Actuator struct {
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionScheduler *GroupDeletionScheduler
deleteOptions options.NodeDeleteOptions
@ -66,8 +65,8 @@ type actuatorNodeGroupConfigGetter interface {
}
// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
if len(ctx.DrainPriorityConfig) > 0 {
@ -77,7 +76,6 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
}
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
@ -102,7 +100,7 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) {
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
a.nodeDeletionScheduler.ReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Now().Sub(deletionStartTime)) }()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()
results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}

View File

@ -40,6 +40,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -1186,13 +1187,16 @@ func TestStartDeletion(t *testing.T) {
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
}
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.Register(csr)
// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, 0*time.Second)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig, fullDsEviction: false}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
@ -1424,12 +1428,14 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.Register(csr)
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
@ -32,7 +33,6 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
@ -48,7 +48,7 @@ const (
type NodeDeletionBatcher struct {
sync.Mutex
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
scaleStateNotifier nodegroupchange.NodeGroupChangeObserver
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
deletionsPerNodeGroup map[string][]*apiv1.Node
deleteInterval time.Duration
@ -56,14 +56,14 @@ type NodeDeletionBatcher struct {
}
// NewNodeDeletionBatcher return new NodeBatchDeleter
func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) *NodeDeletionBatcher {
func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) *NodeDeletionBatcher {
return &NodeDeletionBatcher{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: nodeDeletionTracker,
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
deleteInterval: deleteInterval,
drainedNodeDeletions: make(map[string]bool),
scaleStateNotifier: scaleStateNotifier,
}
}
@ -85,13 +85,13 @@ func (d *NodeDeletionBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovi
}
func (d *NodeDeletionBatcher) deleteNodesAndRegisterStatus(nodes []*apiv1.Node, drain bool) {
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes)
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, d.scaleStateNotifier, nodes)
for _, node := range nodes {
if err != nil {
result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result)
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.scaleStateNotifier, node, nodeGroup, drain, d.nodeDeletionTracker)
}
}
}
@ -129,14 +129,14 @@ func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {
go func(nodes []*apiv1.Node, drainedNodeDeletions map[string]bool) {
var result status.NodeDeleteResult
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes)
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, d.scaleStateNotifier, nodes)
for _, node := range nodes {
drain := drainedNodeDeletions[node.Name]
if err != nil {
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroupId, drain, d.nodeDeletionTracker, "", result)
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.scaleStateNotifier, node, nodeGroup, drain, d.nodeDeletionTracker)
}
}
}(nodes, drainedNodeDeletions)
@ -145,12 +145,15 @@ func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {
// deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) {
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0])
if err != nil {
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err)
}
if err := nodeGroup.DeleteNodes(nodes); err != nil {
scaleStateNotifier.RegisterFailedScaleDown(nodeGroup,
string(errors.CloudProviderError),
time.Now())
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete nodes from group %s: %v", nodeGroup.Id(), err)
}
return nodeGroup, nil
@ -193,14 +196,11 @@ func CleanUpAndRecordFailedScaleDownEvent(ctx *context.AutoscalingContext, node
}
// RegisterAndRecordSuccessfulScaleDownEvent register scale down and record successful scale down event.
func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) {
func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) {
ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "nodes removed by cluster autoscaler")
csr.RegisterScaleDown(&clusterstate.ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: node.Name,
Time: time.Now(),
ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime),
})
currentTime := time.Now()
expectedDeleteTime := time.Now().Add(MaxCloudProviderNodeDeletionTime)
scaleStateNotifier.RegisterScaleDown(nodeGroup, node.Name, currentTime, expectedDeleteTime)
gpuConfig := ctx.CloudProvider.GetNodeGpuConfig(node)
metricResourceName, metricGpuType := gpu.GetGpuInfoForMetrics(gpuConfig, ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup)
metrics.RegisterScaleDown(1, metricResourceName, metricGpuType, nodeScaleDownReason(node, drain))

View File

@ -24,16 +24,13 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
kube_record "k8s.io/client-go/tools/record"
)
func TestAddNodeToBucket(t *testing.T) {
@ -85,7 +82,7 @@ func TestAddNodeToBucket(t *testing.T) {
for _, test := range testcases {
d := NodeDeletionBatcher{
ctx: &ctx,
clusterState: nil,
scaleStateNotifier: nil,
nodeDeletionTracker: nil,
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
drainedNodeDeletions: make(map[string]bool),
@ -139,7 +136,6 @@ func TestRemove(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
test := test
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
failedNodeDeletion := make(map[string]bool)
deletedNodes := make(chan string, 10)
@ -163,20 +159,21 @@ func TestRemove(t *testing.T) {
})
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil, nil)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
ng := "ng"
provider.AddNodeGroup(ng, 1, 10, test.numNodes)
nodeGroup := provider.GetNodeGroup(ng)
d := NodeDeletionBatcher{
ctx: &ctx,
clusterState: clusterStateRegistry,
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(1 * time.Minute),
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
scaleStateNotifier: scaleStateNotifier,
drainedNodeDeletions: make(map[string]bool),
}
nodes := generateNodes(0, test.numNodes, ng)

View File

@ -28,9 +28,9 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
@ -38,18 +38,18 @@ import (
// ScaleUpExecutor scales up node groups.
type scaleUpExecutor struct {
autoscalingContext *context.AutoscalingContext
clusterStateRegistry *clusterstate.ClusterStateRegistry
autoscalingContext *context.AutoscalingContext
scaleStateNotifier nodegroupchange.NodeGroupChangeObserver
}
// New returns new instance of scale up executor.
func newScaleUpExecutor(
autoscalingContext *context.AutoscalingContext,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
scaleStateNotifier nodegroupchange.NodeGroupChangeObserver,
) *scaleUpExecutor {
return &scaleUpExecutor{
autoscalingContext: autoscalingContext,
clusterStateRegistry: clusterStateRegistry,
autoscalingContext: autoscalingContext,
scaleStateNotifier: scaleStateNotifier,
}
}
@ -151,13 +151,13 @@ func (e *scaleUpExecutor) executeScaleUp(
if err := info.Group.IncreaseSize(increase); err != nil {
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
e.clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.FailedScaleUpReason(string(aerr.Type())), aerr.Error(), gpuResourceName, gpuType, now)
e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now)
return aerr
}
e.clusterStateRegistry.RegisterOrUpdateScaleUp(
info.Group,
increase,
time.Now())
if increase < 0 {
return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase))
}
e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now())
metrics.RegisterScaleUp(increase, gpuResourceName, gpuType)
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)

View File

@ -74,7 +74,7 @@ func (o *ScaleUpOrchestrator) Initialize(
o.clusterStateRegistry = clusterStateRegistry
o.taintConfig = taintConfig
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
o.scaleUpExecutor = newScaleUpExecutor(autoscalingContext, clusterStateRegistry)
o.scaleUpExecutor = newScaleUpExecutor(autoscalingContext, processors.ScaleStateNotifier)
o.initialized = true
}

View File

@ -973,6 +973,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
processors := NewTestProcessors(&context)
processors.ScaleStateNotifier.Register(clusterState)
orchestrator := New()
orchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
expander := NewMockRepotingStrategy(t, config.ExpansionOptionToChoose)

View File

@ -167,12 +167,13 @@ func NewStaticAutoscaler(
taintConfig := taints.NewTaintConfig(opts)
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
processors.ScaleStateNotifier.Register(clusterStateRegistry)
// TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext
// during the struct creation rather than here.
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions, drainabilityRules)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
actuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
autoscalingContext.ScaleDownActuator = actuator
var scaleDownPlanner scaledown.Planner
@ -609,17 +610,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)
scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop ||
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
scaleDownInCooldown := a.isScaleDownInCooldown(currentTime, scaleDownCandidates)
klog.V(4).Infof("Scale down status: lastScaleUpTime=%s lastScaleDownDeleteTime=%v "+
"lastScaleDownFailTime=%s scaleDownForbidden=%v scaleDownInCooldown=%v",
a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
a.processorCallbacks.disableScaleDownForLoop, scaleDownInCooldown)
metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)
// We want to delete unneeded Node Groups only if here is no current delete
// in progress.
_, drained := scaleDownActuationStatus.DeletionsInProgress()
@ -689,6 +685,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return nil
}
func (a *StaticAutoscaler) isScaleDownInCooldown(currentTime time.Time, scaleDownCandidates []*apiv1.Node) bool {
scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop || len(scaleDownCandidates) == 0
if a.ScaleDownDelayTypeLocal {
return scaleDownInCooldown
}
return scaleDownInCooldown ||
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
}
// Sets the target size of node groups to the current number of nodes in them
// if the difference was constant for a prolonged time. Returns true if managed
// to fix something.

View File

@ -44,11 +44,13 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -473,6 +475,221 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
mock.AssertExpectationsForObjects(t, onScaleUpMock)
}
func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
tn := BuildTestNode("tn", 1000, 1000)
tni := schedulerframework.NewNodeInfo()
tni.SetNode(tn)
provider := testprovider.NewTestAutoprovisioningCloudProvider(
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
},
nil, nil,
nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni})
assert.NotNil(t, provider)
provider.AddNodeGroup("ng1", 0, 10, 1)
ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup)
assert.NotNil(t, ng1)
provider.AddNodeGroup("ng2", 0, 10, 1)
ng2 := reflect.ValueOf(provider.GetNodeGroup("ng2")).Interface().(*testprovider.TestNodeGroup)
assert.NotNil(t, ng2)
// Create context with mocked lister registry.
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: config.DefaultScaleDownUnneededTime,
ScaleDownUnreadyTime: time.Minute,
ScaleDownUtilizationThreshold: 0.5,
MaxNodeProvisionTime: 10 * time.Second,
},
EstimatorName: estimator.BinpackingEstimatorName,
EnforceNodeGroupMinSize: true,
ScaleDownEnabled: true,
MaxNodesTotal: 1,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
ScaleDownDelayTypeLocal: true,
ScaleDownDelayAfterAdd: 5 * time.Minute,
ScaleDownDelayAfterDelete: 5 * time.Minute,
ScaleDownDelayAfterFailure: 5 * time.Minute,
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
sddProcessor := scaledowncandidates.NewScaleDownCandidatesDelayProcessor()
processors.ScaleStateNotifier.Register(sddProcessor)
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor()
cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers))
cp.Register(sddProcessor)
processors.ScaleDownNodeProcessor = cp
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
processors.ScaleStateNotifier.Register(clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
processorCallbacks: processorCallbacks,
initialized: true,
}
p1 := BuildTestPod("p1", 400, 100)
p1.Annotations[drain.PodSafeToEvictKey] = "true"
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 400, 100)
p2.Annotations[drain.PodSafeToEvictKey] = "true"
p2.Spec.NodeName = "n2"
testCases := []struct {
description string
beforeTest func(processors *ca_processors.AutoscalingProcessors)
expectedScaleDownNG string
expectedScaleDownNode string
afterTest func(processors *ca_processors.AutoscalingProcessors)
}{
// Case 1:
// ng1 scaled up recently
// both ng1 and ng2 have under-utilized nodes
// expectation: under-utilized node in ng2 should be scaled down
{
description: "ng1 scaled up recently - both ng1 and ng2 have under-utilized nodes",
beforeTest: func(processors *ca_processors.AutoscalingProcessors) {
// make CA think ng1 scaled up recently
processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Now().Add(-3*time.Minute))
},
expectedScaleDownNG: "ng2",
expectedScaleDownNode: "n2",
afterTest: func(processors *ca_processors.AutoscalingProcessors) {
// reset scale up in ng1 so that it doesn't block scale down in the next test
// scale down is always recorded relative to time.Now(), no matter
// what currentTime time is passed to RunOnce()
processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Time{})
},
},
// Case 2:
// ng2 scaled down recently
// both ng1 and ng2 have under-utilized nodes
// expectation: under-utilized node in ng1 should be scaled down
{
description: "ng2 scaled down recently - both ng1 and ng2 have under-utilized nodes",
beforeTest: func(processors *ca_processors.AutoscalingProcessors) {
// make CA think ng2 scaled down recently
processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Now().Add(-3*time.Minute), time.Now())
},
expectedScaleDownNG: "ng1",
expectedScaleDownNode: "n1",
afterTest: func(processors *ca_processors.AutoscalingProcessors) {
// reset scale down in ng1 and ng2 so that it doesn't block scale down in the next test
// scale down is always recorded relative to time.Now(), no matter
// what currentTime time is passed to RunOnce()
processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Time{}, time.Time{})
processors.ScaleStateNotifier.RegisterScaleDown(ng1, "n1", time.Time{}, time.Time{})
},
},
// Case 3:
// ng1 had a scale down failure
// both ng1 and ng2 have under-utilized nodes
// expectation: under-utilized node in ng2 should be scaled down
{
description: "ng1 had scale-down failure - both ng1 and ng2 have under-utilized nodes",
beforeTest: func(processors *ca_processors.AutoscalingProcessors) {
// Make CA think scale down failed in ng1
processors.ScaleStateNotifier.RegisterFailedScaleDown(ng1, "scale down failed", time.Now().Add(-3*time.Minute))
},
expectedScaleDownNG: "ng2",
expectedScaleDownNode: "n2",
afterTest: func(processors *ca_processors.AutoscalingProcessors) {
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
tc.beforeTest(processors)
provider.AddNode("ng1", n1)
provider.AddNode("ng2", n2)
ng1.SetTargetSize(1)
ng2.SetTargetSize(1)
// Mark unneeded nodes.
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
err = autoscaler.RunOnce(time.Now())
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
// Scale down nodegroup
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3)
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice()
onScaleDownMock.On("ScaleDown", tc.expectedScaleDownNG, tc.expectedScaleDownNode).Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(config.DefaultScaleDownUnneededTime))
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
tc.afterTest(processors)
})
}
}
func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
allNodeLister := kubernetes.NewTestNodeLister(nil)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
@ -193,6 +194,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
}
}

View File

@ -113,9 +113,11 @@ var (
scaleDownUnreadyEnabled = flag.Bool("scale-down-unready-enabled", true, "Should CA scale down unready nodes of the cluster")
scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute,
"How long after scale up that scale down evaluation resumes")
scaleDownDelayTypeLocal = flag.Bool("scale-down-delay-type-local", false,
"Should --scale-down-delay-after-* flags be applied locally per nodegroup or globally across all nodegroups")
scaleDownDelayAfterDelete = flag.Duration("scale-down-delay-after-delete", 0,
"How long after node deletion that scale down evaluation resumes, defaults to scanInterval")
scaleDownDelayAfterFailure = flag.Duration("scale-down-delay-after-failure", 3*time.Minute,
scaleDownDelayAfterFailure = flag.Duration("scale-down-delay-after-failure", config.DefaultScaleDownDelayAfterFailure,
"How long after scale down failure that scale down evaluation resumes")
scaleDownUnneededTime = flag.Duration("scale-down-unneeded-time", config.DefaultScaleDownUnneededTime,
"How long a node should be unneeded before it is eligible for scale down")
@ -145,7 +147,7 @@ var (
schedulerConfigFile = flag.String(config.SchedulerConfigFileFlag, "", "scheduler-config allows changing configuration of in-tree scheduler plugins acting on PreFilter and Filter extension points")
nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.")
nodeDeletionBatcherInterval = flag.Duration("node-deletion-batcher-interval", 0*time.Second, "How long CA ScaleDown gather nodes to delete them in batch.")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
scanInterval = flag.Duration("scan-interval", config.DefaultScanInterval, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
@ -359,6 +361,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeGroups: *nodeGroupsFlag,
EnforceNodeGroupMinSize: *enforceNodeGroupMinSize,
ScaleDownDelayAfterAdd: *scaleDownDelayAfterAdd,
ScaleDownDelayTypeLocal: *scaleDownDelayTypeLocal,
ScaleDownDelayAfterDelete: *scaleDownDelayAfterDelete,
ScaleDownDelayAfterFailure: *scaleDownDelayAfterFailure,
ScaleDownEnabled: *scaleDownEnabled,
@ -492,8 +495,17 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}
opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting)
}
sdProcessor := scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)
opts.Processors.ScaleDownNodeProcessor = sdProcessor
cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor()
cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers))
if autoscalingOptions.ScaleDownDelayTypeLocal {
sdp := scaledowncandidates.NewScaleDownCandidatesDelayProcessor()
cp.Register(sdp)
opts.Processors.ScaleStateNotifier.Register(sdp)
}
opts.Processors.ScaleDownNodeProcessor = cp
var nodeInfoComparator nodegroupset.NodeInfoComparator
if len(autoscalingOptions.BalancingLabels) > 0 {

View File

@ -0,0 +1,100 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroupchange
import (
"sync"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)
// NodeGroupChangeObserver is an observer of:
// * scale-up(s) for a nodegroup
// * scale-down(s) for a nodegroup
// * scale-up failure(s) for a nodegroup
// * scale-down failure(s) for a nodegroup
type NodeGroupChangeObserver interface {
// RegisterScaleUp records scale up for a nodegroup.
RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time)
// RegisterScaleDowns records scale down for a nodegroup.
RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, nodeName string, currentTime time.Time, expectedDeleteTime time.Time)
// RegisterFailedScaleUp records failed scale-up for a nodegroup.
// reason denotes optional reason for failed scale-up
// errMsg denotes the actual error message
RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errMsg string, gpuResourceName, gpuType string, currentTime time.Time)
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup, reason string, currentTime time.Time)
}
// NodeGroupChangeObserversList is a slice of observers
// of state of scale up/down in the cluster
type NodeGroupChangeObserversList struct {
observers []NodeGroupChangeObserver
// TODO(vadasambar): consider using separate mutexes for functions not related to each other
mutex sync.Mutex
}
// Register adds new observer to the list.
func (l *NodeGroupChangeObserversList) Register(o NodeGroupChangeObserver) {
l.observers = append(l.observers, o)
}
// RegisterScaleUp calls RegisterScaleUp for each observer.
func (l *NodeGroupChangeObserversList) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup,
delta int, currentTime time.Time) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, observer := range l.observers {
observer.RegisterScaleUp(nodeGroup, delta, currentTime)
}
}
// RegisterScaleDown calls RegisterScaleDown for each observer.
func (l *NodeGroupChangeObserversList) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup,
nodeName string, currentTime time.Time, expectedDeleteTime time.Time) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, observer := range l.observers {
observer.RegisterScaleDown(nodeGroup, nodeName, currentTime, expectedDeleteTime)
}
}
// RegisterFailedScaleUp calls RegisterFailedScaleUp for each observer.
func (l *NodeGroupChangeObserversList) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup,
reason string, errMsg, gpuResourceName, gpuType string, currentTime time.Time) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, observer := range l.observers {
observer.RegisterFailedScaleUp(nodeGroup, reason, errMsg, gpuResourceName, gpuType, currentTime)
}
}
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
func (l *NodeGroupChangeObserversList) RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup,
reason string, currentTime time.Time) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, observer := range l.observers {
observer.RegisterFailedScaleDown(nodeGroup, reason, currentTime)
}
}
// NewNodeGroupChangeObserversList return empty list of scale state observers.
func NewNodeGroupChangeObserversList() *NodeGroupChangeObserversList {
return &NodeGroupChangeObserversList{}
}

View File

@ -18,6 +18,7 @@ package processors
import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
@ -67,6 +68,12 @@ type AutoscalingProcessors struct {
ActionableClusterProcessor actionablecluster.ActionableClusterProcessor
// ScaleDownCandidatesNotifier is used to Update and Register new scale down candidates observer.
ScaleDownCandidatesNotifier *scaledowncandidates.ObserversList
// ScaleStateNotifier is used to notify
// * scale-ups per nodegroup
// * scale-downs per nodegroup
// * scale-up failures per nodegroup
// * scale-down failures per nodegroup
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
}
// DefaultProcessors returns default set of processors.
@ -97,6 +104,7 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
}
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaledowncandidates
import (
"reflect"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
// ScaleDownCandidatesDelayProcessor is a processor to filter out
// nodes according to scale down delay per nodegroup
type ScaleDownCandidatesDelayProcessor struct {
scaleUps map[string]time.Time
scaleDowns map[string]time.Time
scaleDownFailures map[string]time.Time
}
// GetPodDestinationCandidates returns nodes as is no processing is required here
func (p *ScaleDownCandidatesDelayProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
return nodes, nil
}
// GetScaleDownCandidates returns filter nodes based on if scale down is enabled or disabled per nodegroup.
func (p *ScaleDownCandidatesDelayProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
result := []*apiv1.Node{}
for _, node := range nodes {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Warningf("Error while checking node group for %s: %v", node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Node %s should not be processed by cluster autoscaler (no node group config)", node.Name)
continue
}
currentTime := time.Now()
recent := func(m map[string]time.Time, d time.Duration, msg string) bool {
if !m[nodeGroup.Id()].IsZero() && m[nodeGroup.Id()].Add(d).After(currentTime) {
klog.V(4).Infof("Skipping scale down on node group %s because it %s recently at %v",
nodeGroup.Id(), msg, m[nodeGroup.Id()])
return true
}
return false
}
if recent(p.scaleUps, ctx.ScaleDownDelayAfterAdd, "scaled up") {
continue
}
if recent(p.scaleDowns, ctx.ScaleDownDelayAfterDelete, "scaled down") {
continue
}
if recent(p.scaleDownFailures, ctx.ScaleDownDelayAfterFailure, "failed to scale down") {
continue
}
result = append(result, node)
}
return result, nil
}
// CleanUp is called at CA termination.
func (p *ScaleDownCandidatesDelayProcessor) CleanUp() {
}
// RegisterScaleUp records when the last scale up happened for a nodegroup.
func (p *ScaleDownCandidatesDelayProcessor) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup,
_ int, currentTime time.Time) {
p.scaleUps[nodeGroup.Id()] = currentTime
}
// RegisterScaleDown records when the last scale down happened for a nodegroup.
func (p *ScaleDownCandidatesDelayProcessor) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup,
nodeName string, currentTime time.Time, _ time.Time) {
p.scaleDowns[nodeGroup.Id()] = currentTime
}
// RegisterFailedScaleUp records when the last scale up failed for a nodegroup.
func (p *ScaleDownCandidatesDelayProcessor) RegisterFailedScaleUp(_ cloudprovider.NodeGroup,
_ string, _ string, _ string, _ string, _ time.Time) {
}
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
func (p *ScaleDownCandidatesDelayProcessor) RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup,
reason string, currentTime time.Time) {
p.scaleDownFailures[nodeGroup.Id()] = currentTime
}
// NewScaleDownCandidatesDelayProcessor returns a new ScaleDownCandidatesDelayProcessor.
func NewScaleDownCandidatesDelayProcessor() *ScaleDownCandidatesDelayProcessor {
return &ScaleDownCandidatesDelayProcessor{
scaleUps: make(map[string]time.Time),
scaleDowns: make(map[string]time.Time),
scaleDownFailures: make(map[string]time.Time),
}
}

View File

@ -0,0 +1,151 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaledowncandidates
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
)
func TestGetScaleDownCandidates(t *testing.T) {
n1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "n1",
},
}
n2 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "n2",
},
}
n3 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "n3",
},
}
ctx := context.AutoscalingContext{
AutoscalingOptions: config.AutoscalingOptions{
ScaleDownDelayAfterAdd: time.Minute * 10,
ScaleDownDelayAfterDelete: time.Minute * 10,
ScaleDownDelayAfterFailure: time.Minute * 10,
ScaleDownDelayTypeLocal: true,
},
}
testCases := map[string]struct {
autoscalingContext context.AutoscalingContext
candidates []*v1.Node
expected []*v1.Node
setupProcessor func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor
}{
// Expectation: no nodegroups should be filtered out
"no scale ups - no scale downs - no scale down failures": {
autoscalingContext: ctx,
candidates: []*v1.Node{n1, n2, n3},
expected: []*v1.Node{n1, n2, n3},
setupProcessor: nil,
},
// Expectation: only nodegroups in cool-down should be filtered out
"no scale ups - 2 scale downs - no scale down failures": {
autoscalingContext: ctx,
candidates: []*v1.Node{n1, n2, n3},
expected: []*v1.Node{n1, n3},
setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor {
// fake nodegroups for calling `RegisterScaleDown`
ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil)
ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil)
// in cool down
p.RegisterScaleDown(ng2, "n2", time.Now().Add(-time.Minute*5), time.Time{})
// not in cool down anymore
p.RegisterScaleDown(ng3, "n3", time.Now().Add(-time.Minute*11), time.Time{})
return p
},
},
// Expectation: only nodegroups in cool-down should be filtered out
"1 scale up - no scale down - no scale down failures": {
autoscalingContext: ctx,
candidates: []*v1.Node{n1, n2, n3},
expected: []*v1.Node{n1, n3},
setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor {
// fake nodegroups for calling `RegisterScaleUp`
ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil)
ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil)
// in cool down
p.RegisterScaleUp(ng2, 0, time.Now().Add(-time.Minute*5))
// not in cool down anymore
p.RegisterScaleUp(ng3, 0, time.Now().Add(-time.Minute*11))
return p
},
},
// Expectation: only nodegroups in cool-down should be filtered out
"no scale up - no scale down - 1 scale down failure": {
autoscalingContext: ctx,
candidates: []*v1.Node{n1, n2, n3},
expected: []*v1.Node{n1, n3},
setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor {
// fake nodegroups for calling `RegisterScaleUp`
ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil)
ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil)
// in cool down
p.RegisterFailedScaleDown(ng2, "", time.Now().Add(-time.Minute*5))
// not in cool down anymore
p.RegisterFailedScaleDown(ng3, "", time.Now().Add(-time.Minute*11))
return p
},
},
}
for description, testCase := range testCases {
t.Run(description, func(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
p := NewScaleDownCandidatesDelayProcessor()
if testCase.setupProcessor != nil {
p = testCase.setupProcessor(p)
}
provider.AddNodeGroup("ng-1", 1, 3, 2)
provider.AddNode("ng-1", n1)
provider.AddNodeGroup("ng-2", 1, 3, 2)
provider.AddNode("ng-2", n2)
provider.AddNodeGroup("ng-3", 1, 3, 2)
provider.AddNode("ng-3", n3)
testCase.autoscalingContext.CloudProvider = provider
no, err := p.GetScaleDownCandidates(&testCase.autoscalingContext, testCase.candidates)
assert.NoError(t, err)
assert.Equal(t, testCase.expected, no)
})
}
}

View File

@ -0,0 +1,72 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaledowncandidates
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
type combinedScaleDownCandidatesProcessor struct {
processors []nodes.ScaleDownNodeProcessor
}
// NewCombinedScaleDownCandidatesProcessor returns a default implementation of the scale down candidates
// processor, which wraps and sequentially runs other sub-processors.
func NewCombinedScaleDownCandidatesProcessor() *combinedScaleDownCandidatesProcessor {
return &combinedScaleDownCandidatesProcessor{}
}
// Register registers a new ScaleDownNodeProcessor
func (p *combinedScaleDownCandidatesProcessor) Register(np nodes.ScaleDownNodeProcessor) {
p.processors = append(p.processors, np)
}
// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods
// that would become unscheduled after a scale down.
func (p *combinedScaleDownCandidatesProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext, nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
var err errors.AutoscalerError
for _, processor := range p.processors {
nodes, err = processor.GetPodDestinationCandidates(ctx, nodes)
if err != nil {
return nil, err
}
}
return nodes, nil
}
// GetScaleDownCandidates returns nodes that potentially could be scaled down.
func (p *combinedScaleDownCandidatesProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext, nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
var err errors.AutoscalerError
for _, processor := range p.processors {
nodes, err = processor.GetScaleDownCandidates(ctx, nodes)
if err != nil {
return nil, err
}
}
return nodes, nil
}
// CleanUp is called at CA termination
func (p *combinedScaleDownCandidatesProcessor) CleanUp() {
for _, processor := range p.processors {
processor.CleanUp()
}
}