Separate ScaleDown logic with a new interface

This commit is contained in:
Daniel Kłobuszewski 2022-04-15 12:58:39 +02:00
parent 5a78f49bc2
commit 7f8b2da9e3
5 changed files with 290 additions and 41 deletions

View File

@ -24,6 +24,7 @@ import (
)
// NodeDeletionTracker keeps track of node deletions.
// TODO: extend to implement ActuationStatus interface
type NodeDeletionTracker struct {
sync.Mutex
nonEmptyNodeDeleteInProgress bool

View File

@ -0,0 +1,137 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package legacy
import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
)
// ScaleDownWrapper wraps legacy scaledown logic to satisfy scaledown.Planner &
// scaledown.Actuator interfaces.
type ScaleDownWrapper struct {
sd *ScaleDown
pdbs []*policyv1.PodDisruptionBudget
}
// NewScaleDownWrapper returns a new ScaleDownWrapper
func NewScaleDownWrapper(sd *ScaleDown) *ScaleDownWrapper {
return &ScaleDownWrapper{
sd: sd,
}
}
// UpdateClusterState updates unneeded nodes in the underlying ScaleDown.
func (p *ScaleDownWrapper) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, actuationStatus scaledown.ActuationStatus, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError {
p.sd.CleanUp(currentTime)
p.pdbs = pdbs
return p.sd.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs)
}
// CleanUpUnneededNodes cleans up unneeded nodes.
func (p *ScaleDownWrapper) CleanUpUnneededNodes() {
p.sd.CleanUpUnneededNodes()
}
// NodesToDelete lists nodes to delete. Current implementation is a no-op, the
// wrapper leverages shared state instead.
// TODO(x13n): Implement this and get rid of sharing state between planning and
// actuation.
func (p *ScaleDownWrapper) NodesToDelete() (empty, needDrain []*apiv1.Node) {
return nil, nil
}
// UnneededNodes returns a list of unneeded nodes.
func (p *ScaleDownWrapper) UnneededNodes() []*apiv1.Node {
return p.sd.UnneededNodes()
}
// UnremovableNodes returns a list of nodes that cannot be removed.
func (p *ScaleDownWrapper) UnremovableNodes() []*simulator.UnremovableNode {
return p.sd.UnremovableNodes()
}
// NodeUtilizationMap returns information about utilization of individual
// cluster nodes.
func (p *ScaleDownWrapper) NodeUtilizationMap() map[string]utilization.Info {
return p.sd.NodeUtilizationMap()
}
// StartDeletion triggers an actual scale down logic.
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) {
return p.sd.TryToScaleDown(currentTime, p.pdbs)
}
// CheckStatus snapshots current deletion status
func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
// TODO: snapshot information from the tracker instead of keeping live
// updated object.
return &actuationStatus{
ndt: p.sd.nodeDeletionTracker,
}
}
// ClearResultsNotNewerThan clears old node deletion results kept by the
// Actuator.
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
// TODO: implement this once results are not cleared while being
// fetched.
}
type actuationStatus struct {
ndt *deletiontracker.NodeDeletionTracker
}
// DeletionsInProgress returns node names of currently deleted nodes.
// Current implementation is not aware of the actual nodes names, so it returns
// a fake node name instead.
// TODO: Return real node names
func (a *actuationStatus) DeletionsInProgress() []string {
if a.ndt.IsNonEmptyNodeDeleteInProgress() {
return []string{"fake-node-name"}
}
return nil
}
// DeletionsCount returns total number of ongoing deletions in a given node
// group.
func (a *actuationStatus) DeletionsCount(nodeGroupId string) int {
return a.ndt.GetDeletionsInProgress(nodeGroupId)
}
// RecentEvictions should return a list of recently evicted pods. Since legacy
// scale down logic only drains at most one node at a time, this safeguard is
// not really needed there, so we can just return an empty list.
func (a *actuationStatus) RecentEvictions() []*apiv1.Pod {
return nil
}
// DeletionResults returns a map of recent node deletion results.
func (a *actuationStatus) DeletionResults() map[string]status.NodeDeleteResult {
// TODO: update nodeDeletionTracker so it doesn't get & clear in the
// same step.
return a.ndt.GetAndClearNodeDeleteResults()
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaledown
import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
)
// Planner is responsible for selecting nodes that should be removed.
type Planner interface {
// UpdateClusterState provides the Planner with information about the cluster.
UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError
// CleanUpUnneededNodes resets internal state of the Planner.
CleanUpUnneededNodes()
// NodesToDelete returns a list of nodes that can be deleted right now,
// according to the Planner.
NodesToDelete() (empty, needDrain []*apiv1.Node)
// UnneededNodes returns a list of nodes that either can be deleted
// right now or in a near future, assuming nothing will change in the
// cluster.
UnneededNodes() []*apiv1.Node
// UnremovableNodes returns a list of nodes that cannot be removed.
// TODO(x13n): Add a guarantee that each node is either unneeded or
// unremovable. This is not guaranteed by the current implementation.
UnremovableNodes() []*simulator.UnremovableNode
// NodeUtilizationMap returns information about utilization of
// individual cluster nodes.
NodeUtilizationMap() map[string]utilization.Info
}
// Actuator is responsible for making changes in the cluster: draining and
// deleting nodes.
type Actuator interface {
// StartDeletion triggers a new deletion process. Nodes passed to this
// function are not guaranteed to be deleted, it is possible for the
// Actuator to ignore some of them e.g. if max configured level of
// parallelism is reached.
StartDeletion(empty, needDrain []*apiv1.Node, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError)
// CheckStatus returns an immutable snapshot of ongoing deletions.
CheckStatus() ActuationStatus
// ClearResultsNotNewerThan removes information about deletions finished
// before or exactly at the provided timestamp.
ClearResultsNotNewerThan(time.Time)
}
// ActuationStatus is used for feeding Actuator status back into Planner
type ActuationStatus interface {
// DeletionsInProgress returns a list of nodes that are currently
// undergoing deletion.
DeletionsInProgress() (nodeNames []string)
// DeletionsCount returns total number of ongoing deletions in a given
// node group.
DeletionsCount(nodeGroupId string) int
// RecentEvictions returns a list of pods that were recently removed by
// the Actuator and hence are likely to get recreated elsewhere in the
// cluster.
RecentEvictions() (pods []*apiv1.Pod)
// DeletionResults returns a map of recent node deletion results, keyed
// by the node name. Note: if node deletion was scheduled more than
// once, only the latest result will be present.
DeletionResults() map[string]status.NodeDeleteResult
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
@ -71,7 +72,8 @@ type StaticAutoscaler struct {
lastScaleUpTime time.Time
lastScaleDownDeleteTime time.Time
lastScaleDownFailTime time.Time
scaleDown *legacy.ScaleDown
scaleDownPlanner scaledown.Planner
scaleDownActuator scaledown.Actuator
processors *ca_processors.AutoscalingProcessors
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
@ -81,11 +83,11 @@ type StaticAutoscaler struct {
type staticAutoscalerProcessorCallbacks struct {
disableScaleDownForLoop bool
extraValues map[string]interface{}
scaleDown *legacy.ScaleDown
scaleDownPlanner scaledown.Planner
}
func (callbacks *staticAutoscalerProcessorCallbacks) ResetUnneededNodes() {
callbacks.scaleDown.CleanUpUnneededNodes()
callbacks.scaleDownPlanner.CleanUpUnneededNodes()
}
func newStaticAutoscalerProcessorCallbacks() *staticAutoscalerProcessorCallbacks {
@ -152,7 +154,8 @@ func NewStaticAutoscaler(
clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry)
processorCallbacks.scaleDown = scaleDown
scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown)
processorCallbacks.scaleDownPlanner = scaleDownWrapper
// Set the initial scale times to be less than the start time so as to
// not start in cooldown mode.
@ -162,7 +165,8 @@ func NewStaticAutoscaler(
lastScaleUpTime: initialScaleTime,
lastScaleDownDeleteTime: initialScaleTime,
lastScaleDownFailTime: initialScaleTime,
scaleDown: scaleDown,
scaleDownPlanner: scaleDownWrapper,
scaleDownActuator: scaleDownWrapper,
processors: processors,
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
@ -231,7 +235,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
unschedulablePodLister := a.UnschedulablePodLister()
scheduledPodLister := a.ScheduledPodLister()
pdbLister := a.PodDisruptionBudgetLister()
scaleDown := a.scaleDown
autoscalingContext := a.AutoscalingContext
klog.V(4).Info("Starting main loop")
@ -326,7 +329,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus)
}
if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(scaleDown.UnremovableNodes(), scaleDown.NodeUtilizationMap(), a.CloudProvider)
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus)
}
@ -354,7 +357,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
if !a.clusterStateRegistry.IsClusterHealthy() {
klog.Warning("Cluster is not ready for autoscaling")
scaleDown.CleanUpUnneededNodes()
a.scaleDownPlanner.CleanUpUnneededNodes()
autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "ClusterUnhealthy", "Cluster is unhealthy")
return nil
}
@ -480,8 +483,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
klog.V(4).Infof("Calculating unneeded nodes")
scaleDown.CleanUp(currentTime)
var scaleDownCandidates []*apiv1.Node
var podDestinations []*apiv1.Node
@ -509,9 +510,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
}
}
// We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors
// (e.g unscheduled pods with nominated node name) can block scaledown of given node.
if typedErr := scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs); typedErr != nil {
actuationStatus := a.scaleDownActuator.CheckStatus()
if typedErr := a.scaleDownPlanner.UpdateClusterState(podDestinations, scaleDownCandidates, actuationStatus, pdbs, currentTime); typedErr != nil {
scaleDownStatus.Result = status.ScaleDownError
klog.Errorf("Failed to scale down: %v", typedErr)
return typedErr
@ -523,20 +523,22 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
// TODO(x13n): Move deletionsInProgress > 0 condition to the legacy scaledown implementation.
deletionsInProgress := len(actuationStatus.DeletionsInProgress())
// In dry run only utilization is updated
calculateUnneededOnly := scaleDownInCooldown || scaleDown.IsNonEmptyNodeDeleteInProgress()
calculateUnneededOnly := scaleDownInCooldown || deletionsInProgress > 0
klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v "+
"isDeleteInProgress=%v scaleDownInCooldown=%v",
"deletionsInProgress=%v scaleDownInCooldown=%v",
calculateUnneededOnly, a.lastScaleUpTime,
a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop,
scaleDown.IsNonEmptyNodeDeleteInProgress(), scaleDownInCooldown)
deletionsInProgress, scaleDownInCooldown)
metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)
if scaleDownInCooldown {
scaleDownStatus.Result = status.ScaleDownInCooldown
} else if scaleDown.IsNonEmptyNodeDeleteInProgress() {
} else if deletionsInProgress > 0 {
scaleDownStatus.Result = status.ScaleDownInProgress
} else {
klog.V(4).Infof("Starting scale down")
@ -550,9 +552,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(currentTime, pdbs)
empty, needDrain := a.scaleDownPlanner.NodesToDelete()
scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain, currentTime)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(scaleDown.UnremovableNodesCount())
metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))
scaleDownStatus.RemovedNodeGroups = removedNodeGroups
@ -564,13 +567,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
if (scaleDownStatus.Result == status.ScaleDownNoNodeDeleted ||
scaleDownStatus.Result == status.ScaleDownNoUnneeded) &&
a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
taintableNodes := a.scaleDown.UnneededNodes()
taintableNodes := a.scaleDownPlanner.UnneededNodes()
untaintableNodes := subtractNodes(allNodes, taintableNodes)
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(scaleDown.UnremovableNodes(), scaleDown.NodeUtilizationMap(), a.CloudProvider)
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
scaleDownStatusProcessorAlreadyCalled = true
}
@ -762,7 +765,7 @@ func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosF
err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime)
if err != nil {
klog.Errorf("Failed to update node registry: %v", err)
a.scaleDown.CleanUpUnneededNodes()
a.scaleDownPlanner.CleanUpUnneededNodes()
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}
core_utils.UpdateClusterStateMetrics(a.clusterStateRegistry)
@ -820,6 +823,16 @@ func calculateCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64,
return coresTotal, memoryTotal
}
func countsByReason(nodes []*simulator.UnremovableNode) map[simulator.UnremovableReason]int {
counts := make(map[simulator.UnremovableReason]int)
for _, node := range nodes {
counts[node.Reason]++
}
return counts
}
func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
var c []*apiv1.Node
namesToDrop := make(map[string]bool)

View File

@ -30,10 +30,12 @@ import (
clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -202,14 +204,15 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
processors := NewTestProcessors()
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff())
sd := legacy.NewScaleDown(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
processorCallbacks: processorCallbacks,
initialized: true,
@ -268,7 +271,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDown)
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -299,7 +302,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDown)
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -394,14 +397,15 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff())
sd := legacy.NewScaleDown(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
processorCallbacks: processorCallbacks,
initialized: true,
@ -453,7 +457,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
onScaleDownMock.On("ScaleDown", "autoprovisioned-TN2", "n2").Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(2 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDown)
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -539,14 +543,15 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
processors := NewTestProcessors()
sd := legacy.NewScaleDown(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
processorCallbacks: processorCallbacks,
}
@ -577,7 +582,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
err = autoscaler.RunOnce(later.Add(2 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDown)
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -679,14 +684,15 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
processors := NewTestProcessors()
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff())
sd := legacy.NewScaleDown(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
processorCallbacks: processorCallbacks,
}
@ -731,7 +737,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
p4.Spec.NodeName = "n2"
err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDown)
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -807,14 +813,15 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
processors := NewTestProcessors()
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff())
sd := legacy.NewScaleDown(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
processorCallbacks: processorCallbacks,
}
@ -903,14 +910,15 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
processors := NewTestProcessors()
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff())
sd := legacy.NewScaleDown(&context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
processorCallbacks: processorCallbacks,
}
@ -1321,11 +1329,17 @@ func nodeNames(ns []*apiv1.Node) []string {
return names
}
func waitForDeleteToFinish(t *testing.T, sd *legacy.ScaleDown) {
func waitForDeleteToFinish(t *testing.T, sda scaledown.Actuator) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.IsNonEmptyNodeDeleteInProgress() {
if len(sda.CheckStatus().DeletionsInProgress()) == 0 {
return
}
}
t.Fatalf("Node delete not finished")
}
func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) {
sd := legacy.NewScaleDown(ctx, p, cs)
wrapper := legacy.NewScaleDownWrapper(sd)
return wrapper, wrapper
}