Refactor StartDeletion usage patterns and enforce periodic scaledown status processor calls.

This commit is contained in:
Maksym Fuhol 2024-02-27 12:51:37 +00:00
parent b8506aff98
commit bed505891c
9 changed files with 278 additions and 84 deletions

View File

@ -269,8 +269,8 @@ type mockActuator struct {
status *mockActuationStatus
}
func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
return nil, nil
func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
return status.ScaleDownError, []*status.ScaleDownNode{}, nil
}
func (m *mockActuator) CheckStatus() scaledown.ActuationStatus {
@ -281,6 +281,10 @@ func (m *mockActuator) ClearResultsNotNewerThan(time.Time) {
}
func (m *mockActuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
return map[string]status.NodeDeleteResult{}, time.Now()
}
type mockActuationStatus struct {
drainedNodes []string
}

View File

@ -96,47 +96,47 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) {
a.nodeDeletionTracker.ClearResultsNotNewerThan(t)
}
// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
func (a *Actuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
return a.nodeDeletionTracker.DeletionResults()
}
// StartDeletion triggers a new deletion process.
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
a.nodeDeletionScheduler.ResetAndReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()
results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}
scaledDownNodes := make([]*status.ScaleDownNode, 0)
emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain)
if len(emptyToDelete) == 0 && len(drainToDelete) == 0 {
scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
return scaleDownStatus, nil
return status.ScaleDownNoNodeDeleted, nil, nil
}
if len(emptyToDelete) > 0 {
// Taint all empty nodes synchronously
if err := a.taintNodesSync(emptyToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
return status.ScaleDownError, scaledDownNodes, err
}
emptyScaledDown := a.deleteAsyncEmpty(emptyToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...)
scaledDownNodes = append(scaledDownNodes, emptyScaledDown...)
}
if len(drainToDelete) > 0 {
// Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node
// could get recreated on another.
if err := a.taintNodesSync(drainToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
return status.ScaleDownError, scaledDownNodes, err
}
// All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously.
drainScaledDown := a.deleteAsyncDrain(drainToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...)
scaledDownNodes = append(scaledDownNodes, drainScaledDown...)
}
scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
return scaleDownStatus, nil
return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil
}
// deleteAsyncEmpty immediately starts deletions asynchronously.

View File

@ -1174,9 +1174,7 @@ func TestStartDeletion(t *testing.T) {
}
}
wantScaleDownStatus := &status.ScaleDownStatus{
Result: tc.wantStatus.result,
}
wantScaleDownNodes := []*status.ScaleDownNode{}
for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes {
statusScaledDownNode := &status.ScaleDownNode{
Node: generateNode(scaleDownNodeInfo.name),
@ -1184,7 +1182,7 @@ func TestStartDeletion(t *testing.T) {
EvictedPods: scaleDownNodeInfo.evictedPods,
UtilInfo: scaleDownNodeInfo.utilInfo,
}
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
wantScaleDownNodes = append(wantScaleDownNodes, statusScaledDownNode)
}
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
@ -1201,18 +1199,22 @@ func TestStartDeletion(t *testing.T) {
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
gotResult, gotScaleDownNodes, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}
// Verify ScaleDownStatus looks as expected.
// Verify ScaleDownResult looks as expected.
if diff := cmp.Diff(tc.wantStatus.result, gotResult); diff != "" {
t.Errorf("StartDeletion result diff (-want +got):\n%s", diff)
}
// Verify ScaleDownNodes looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownNodes, gotScaleDownNodes, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion scaled down nodes diff (-want +got):\n%s", diff)
}
// Verify that all expected nodes were deleted using the cloud provider hook.
@ -1278,13 +1280,9 @@ func TestStartDeletion(t *testing.T) {
t.Errorf("Timeout while waiting for node deletion results")
}
// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
// Gather node deletion results for deletions started in the previous call, and verify that they look as expected.
nodeDeleteResults, _ := actuator.DeletionResults()
if diff := cmp.Diff(tc.wantNodeDeleteResults, nodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
})

View File

@ -148,7 +148,8 @@ func (n *NodeDeletionTracker) DeletionsCount(nodeGroupId string) int {
return n.deletionsPerNodeGroup[nodeGroupId]
}
// DeletionResults returns deletion results in a map form, along with the timestamp of last result.
// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
func (n *NodeDeletionTracker) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
n.Lock()
defer n.Unlock()

View File

@ -779,10 +779,10 @@ func TestScaleDown(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
assert.Equal(t, n1.Name, utils.GetStringFromChan(updatedNodes))
}
@ -1036,7 +1036,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
@ -1045,7 +1045,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
} else {
expectedScaleDownResult = status.ScaleDownNoUnneeded
}
assert.Equal(t, expectedScaleDownResult, scaleDownStatus.Result)
assert.Equal(t, expectedScaleDownResult, scaleDownResult)
expectedScaleDownCount := config.ExpectedScaleDownCount
if config.ExpectedScaleDownCount == 0 {
@ -1131,11 +1131,11 @@ func TestNoScaleDownUnready(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult)
deletedNodes := make(chan string, 10)
@ -1155,11 +1155,11 @@ func TestNoScaleDownUnready(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-2*time.Hour))
assert.NoError(t, autoscalererr)
empty, drain = wrapper.NodesToDelete(time.Now())
scaleDownStatus, err = wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err = wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
}
@ -1245,11 +1245,11 @@ func TestScaleDownNoMove(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult)
}
func getCountOfChan(c chan string) int {

View File

@ -89,20 +89,12 @@ func (p *ScaleDownWrapper) NodesToDelete(currentTime time.Time) (empty, needDrai
}
// StartDeletion triggers an actual scale down logic.
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
// Done to preserve legacy behavior, see comment on NodesToDelete.
if p.lastNodesToDeleteErr != nil || p.lastNodesToDeleteResult != status.ScaleDownNodeDeleteStarted {
// When there is no need for scale-down, p.lastNodesToDeleteResult is set to ScaleDownNoUnneeded. We have to still report node delete
// results in this case, otherwise they wouldn't get reported until the next call to actuator.StartDeletion (i.e. until the next scale-down
// attempt).
// Run actuator.StartDeletion with no nodes just to grab the delete results.
origStatus, _ := p.actuator.StartDeletion(nil, nil)
return &status.ScaleDownStatus{
Result: p.lastNodesToDeleteResult,
NodeDeleteResults: origStatus.NodeDeleteResults,
NodeDeleteResultsAsOf: origStatus.NodeDeleteResultsAsOf,
}, p.lastNodesToDeleteErr
return p.lastNodesToDeleteResult, []*status.ScaleDownNode{}, p.lastNodesToDeleteErr
}
return p.actuator.StartDeletion(empty, needDrain)
}
@ -116,3 +108,9 @@ func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
p.actuator.ClearResultsNotNewerThan(t)
}
// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
func (p *ScaleDownWrapper) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
return p.actuator.DeletionResults()
}

View File

@ -56,12 +56,15 @@ type Actuator interface {
// function are not guaranteed to be deleted, it is possible for the
// Actuator to ignore some of them e.g. if max configured level of
// parallelism is reached.
StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError)
StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError)
// CheckStatus returns an immutable snapshot of ongoing deletions.
CheckStatus() ActuationStatus
// ClearResultsNotNewerThan removes information about deletions finished
// before or exactly at the provided timestamp.
ClearResultsNotNewerThan(time.Time)
// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
DeletionResults() (map[string]status.NodeDeleteResult, time.Time)
}
// ActuationStatus is used for feeding Actuator status back into Planner

View File

@ -388,7 +388,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried}
scaleUpStatusProcessorAlreadyCalled := false
scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried}
scaleDownStatusProcessorAlreadyCalled := false
defer func() {
// Update status information when the loop is done (regardless of reason)
@ -403,14 +402,22 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus)
}
if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
// Gather status before scaledown status processor invocation
nodeDeletionResults, nodeDeletionResultsAsOf := a.scaleDownActuator.DeletionResults()
scaleDownStatus.NodeDeleteResults = nodeDeletionResults
scaleDownStatus.NodeDeleteResultsAsOf = nodeDeletionResultsAsOf
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus)
}
err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
if a.processors != nil && a.processors.AutoscalingStatusProcessor != nil {
err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
}
}
}()
@ -637,17 +644,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
if scaleDownInCooldown {
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
if len(removedNodeGroups) > 0 {
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
}
} else {
klog.V(4).Infof("Starting scale down")
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime)
scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
scaleDownStatus.Result = scaleDownResult
scaleDownStatus.ScaledDownNodes = scaledDownNodes
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))
@ -673,12 +678,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
scaleDownStatusProcessorAlreadyCalled = true
}
if typedErr != nil {
klog.Errorf("Failed to scale down: %v", typedErr)
a.lastScaleDownFailTime = currentTime

View File

@ -38,6 +38,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
@ -70,6 +71,8 @@ import (
v1appslister "k8s.io/client-go/listers/apps/v1"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
klog "k8s.io/klog/v2"
@ -173,12 +176,26 @@ type scaleCall struct {
delta int
}
type scaleDownStatusProcessorMock struct {
called int
scaleDownStatus *status.ScaleDownStatus
}
func (p *scaleDownStatusProcessorMock) Process(_ *context.AutoscalingContext, st *status.ScaleDownStatus) {
p.called += 1
p.scaleDownStatus = st
}
func (p *scaleDownStatusProcessorMock) CleanUp() {
}
type commonMocks struct {
readyNodeLister *kube_util.TestNodeLister
allNodeLister *kube_util.TestNodeLister
allPodLister *podListerMock
podDisruptionBudgetLister *podDisruptionBudgetListerMock
daemonSetLister *daemonSetListerMock
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
onScaleUp *onScaleUpMock
onScaleDown *onScaleDownMock
@ -270,7 +287,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
processors := NewTestProcessors(&context)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, config.mocks.nodeDeletionTracker)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -363,7 +380,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
}
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -563,7 +580,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
processors.ScaleStateNotifier.Register(clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -789,7 +806,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -940,7 +957,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
processors := NewTestProcessors(&context)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -1089,7 +1106,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -1221,7 +1238,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
@ -1320,7 +1337,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
@ -2396,6 +2413,177 @@ func TestFilterOutYoungPods(t *testing.T) {
}
}
func TestStaticAutoscalerRunOnceInvokesScaleDownStatusProcessor(t *testing.T) {
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: -1 * time.Nanosecond, // enforce immediate scaledown/drain for ready
ScaleDownUnreadyTime: -1 * time.Nanosecond, // enforce immediate scaledown/drain for unready
ScaleDownUtilizationThreshold: 0.5,
MaxNodeProvisionTime: 10 * time.Second,
},
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
MaxNodesTotal: 10,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
}
now := time.Now()
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, now)
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, now)
underUtilizedPod := BuildTestPod("p1", 20, 20, WithNodeName("n1"))
utilizedPod := BuildTestPod("p1", 800, 800, WithNodeName("n1"))
testCases := map[string]struct {
pods []*apiv1.Pod
nodes []*apiv1.Node
fakeDeletionResults map[string]status.NodeDeleteResult
fakeDeletionResultsNodeGroup string
expectedStatus *status.ScaleDownStatus
}{
"no candidates": {
pods: []*apiv1.Pod{utilizedPod},
nodes: []*apiv1.Node{n1},
expectedStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNoUnneeded,
ScaledDownNodes: []*status.ScaleDownNode{},
UnremovableNodes: []*status.UnremovableNode{
{
Node: n1,
BlockingPod: nil,
Reason: simulator.NotUnderutilized,
},
},
RemovedNodeGroups: []cloudprovider.NodeGroup{},
NodeDeleteResults: map[string]status.NodeDeleteResult{},
NodeDeleteResultsAsOf: time.Time{},
},
},
"scaledown": {
pods: []*apiv1.Pod{underUtilizedPod},
nodes: []*apiv1.Node{n1, n2},
expectedStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNodeDeleteStarted,
ScaledDownNodes: []*status.ScaleDownNode{
{
Node: n2,
},
},
UnremovableNodes: []*status.UnremovableNode{
{
Node: n1,
Reason: simulator.BlockedByPod,
BlockingPod: &drain.BlockingPod{
Pod: underUtilizedPod,
Reason: drain.NotReplicated,
},
},
},
RemovedNodeGroups: []cloudprovider.NodeGroup{},
NodeDeleteResults: map[string]status.NodeDeleteResult{},
NodeDeleteResultsAsOf: time.Time{},
},
},
"no candidates, node deleted": {
pods: []*apiv1.Pod{utilizedPod},
nodes: []*apiv1.Node{n1},
fakeDeletionResults: map[string]status.NodeDeleteResult{"n1": {
Err: nil,
ResultType: status.NodeDeleteOk,
}},
fakeDeletionResultsNodeGroup: "ng1",
expectedStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNoUnneeded,
ScaledDownNodes: []*status.ScaleDownNode{},
UnremovableNodes: []*status.UnremovableNode{
{
Node: n1,
BlockingPod: nil,
Reason: simulator.NotUnderutilized,
},
},
RemovedNodeGroups: []cloudprovider.NodeGroup{},
NodeDeleteResults: map[string]status.NodeDeleteResult{"n1": {
Err: nil,
ResultType: status.NodeDeleteOk,
}},
NodeDeleteResultsAsOf: time.Time{},
},
},
}
for testName, test := range testCases {
// prevent issues with scoping, we should be able to get rid of that with Go 1.22
test := test
t.Run(testName, func(t *testing.T) {
t.Parallel()
mocks := newCommonMocks()
if test.fakeDeletionResults != nil {
tracker := deletiontracker.NewNodeDeletionTracker(time.Second * 0)
for node, result := range test.fakeDeletionResults {
tracker.StartDeletion(test.fakeDeletionResultsNodeGroup, node)
tracker.EndDeletion(test.fakeDeletionResultsNodeGroup, node, result)
}
mocks.nodeDeletionTracker = tracker
}
setupConfig := &autoscalerSetupConfig{
autoscalingOptions: options,
nodeGroups: []*nodeGroup{{
name: "ng1",
min: 0,
max: 10,
nodes: test.nodes,
}},
nodeStateUpdateTime: now,
mocks: mocks,
clusterStateConfig: clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
},
}
autoscaler, err := setupAutoscaler(setupConfig)
assert.NoError(t, err)
statusProcessor := &scaleDownStatusProcessorMock{}
autoscaler.processors.ScaleDownStatusProcessor = statusProcessor
setupConfig.mocks.readyNodeLister.SetNodes(test.nodes)
setupConfig.mocks.allNodeLister.SetNodes(test.nodes)
setupConfig.mocks.allPodLister.On("List").Return(test.pods, nil)
setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil)
setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil)
setupConfig.mocks.onScaleDown.On("ScaleDown", "ng1", "n2").Return(nil).Maybe()
err = autoscaler.RunOnce(now.Add(time.Hour))
assert.NoError(t, err)
assert.Equal(t, statusProcessor.called, 1)
opts := cmp.Options{
// These fields are not important for this check and may clutter the whole plot
cmpopts.IgnoreFields(status.UnremovableNode{}, "NodeGroup", "UtilInfo"),
cmpopts.IgnoreFields(status.ScaleDownNode{}, "NodeGroup", "UtilInfo"),
cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf"),
cmpopts.EquateEmpty(),
}
if diff := cmp.Diff(test.expectedStatus, statusProcessor.scaleDownStatus, opts); diff != "" {
t.Errorf("ScaleDownStatusProcessor.Process(...): err diff (-want +got):\n%s", diff)
}
mock.AssertExpectationsForObjects(t,
setupConfig.mocks.allPodLister,
setupConfig.mocks.podDisruptionBudgetLister,
setupConfig.mocks.daemonSetLister,
setupConfig.mocks.onScaleUp,
setupConfig.mocks.onScaleDown,
)
})
}
}
func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
select {
case <-deleteFinished:
@ -2405,7 +2593,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
}
}
func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) {
func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) (scaledown.Planner, scaledown.Actuator) {
ctx.MaxScaleDownParallelism = 10
ctx.MaxDrainParallelism = 1
ctx.NodeDeletionBatcherInterval = 0 * time.Second
@ -2415,9 +2603,12 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce
SkipNodesWithLocalStorage: true,
SkipNodesWithCustomControllerPods: true,
}
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
sd := legacy.NewScaleDown(ctx, p, ndt, deleteOptions, nil)
actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions, nil, p.NodeGroupConfigProcessor)
if nodeDeletionTracker == nil {
nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
sd := legacy.NewScaleDown(ctx, p, nodeDeletionTracker, deleteOptions, nil)
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor)
wrapper := legacy.NewScaleDownWrapper(sd, actuator)
return wrapper, wrapper
}