CA: switch legacy ScaleDown to use the new Actuator

NodeDeletionTracker is now incremented asynchronously
for drained nodes, instead of synchronously. This shouldn't
change anything in actual behavior, but some tests
depended on that, so they had to be adapted.

The switch aims to mostly be a semantic no-op, with
the following exceptions:
* Nodes that fail to be tainted won't be included in
  NodeDeleteResults, since they are now tainted
  synchronously.
This commit is contained in:
Kuba Tużnik 2022-05-20 18:05:44 +02:00
parent b228f789dd
commit 6bd2432894
6 changed files with 148 additions and 436 deletions

View File

@ -19,7 +19,6 @@ package legacy
import (
"math"
"reflect"
"strings"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -36,8 +35,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -46,8 +43,6 @@ import (
policyv1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
klog "k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -254,7 +249,7 @@ type ScaleDown struct {
}
// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry) *ScaleDown {
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
return &ScaleDown{
@ -267,7 +262,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
nodeDeletionTracker: ndt,
removalSimulator: removalSimulator,
}
}
@ -594,28 +589,20 @@ func (sd *ScaleDown) mapNodesToStatusScaleDownNodes(nodes []*apiv1.Node, nodeGro
return result
}
// TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was
// removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(
currentTime time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {
// NodesToDelete selects the nodes to delete for scale down.
func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDisruptionBudget) (empty, drain []*apiv1.Node, res status.ScaleDownResult, err errors.AutoscalerError) {
_, drained := sd.nodeDeletionTracker.DeletionsInProgress()
if len(drained) > 0 {
return &status.ScaleDownStatus{
Result: status.ScaleDownInProgress,
}, nil
return nil, nil, status.ScaleDownInProgress, nil
}
ndr, ts := sd.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: ndr, NodeDeleteResultsAsOf: ts}
nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration)
allNodeInfos, errSnapshot := sd.context.ClusterSnapshot.NodeInfos().List()
if errSnapshot != nil {
// This should never happen, List() returns err only because scheduler interface requires it.
return scaleDownStatus, errors.ToAutoscalerError(errors.InternalError, errSnapshot)
return nil, nil, status.ScaleDownError, errors.ToAutoscalerError(errors.InternalError, errSnapshot)
}
nodesWithoutMaster := filterOutMasters(allNodeInfos)
@ -627,13 +614,10 @@ func (sd *ScaleDown) TryToScaleDown(
candidateNames := make([]string, 0)
readinessMap := make(map[string]bool)
candidateNodeGroups := make(map[string]cloudprovider.NodeGroup)
gpuLabel := sd.context.CloudProvider.GPULabel()
availableGPUTypes := sd.context.CloudProvider.GetAvailableGPUTypes()
resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter()
if errCP != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
return nil, nil, status.ScaleDownError, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
}
scaleDownResourcesLeft := sd.computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)
@ -731,8 +715,7 @@ func (sd *ScaleDown) TryToScaleDown(
if len(candidateNames) == 0 {
klog.V(1).Infof("No candidates for scale down")
scaleDownStatus.Result = status.ScaleDownNoUnneeded
return scaleDownStatus, nil
return nil, nil, status.ScaleDownNoUnneeded, nil
}
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
@ -741,25 +724,16 @@ func (sd *ScaleDown) TryToScaleDown(
emptyNodesToRemove := sd.getEmptyNodesToRemove(candidateNames, scaleDownResourcesLeft, currentTime)
emptyNodesToRemove = sd.processors.ScaleDownSetProcessor.GetNodesToRemove(sd.context, emptyNodesToRemove, sd.context.MaxEmptyBulkDelete)
if len(emptyNodesToRemove) > 0 {
nodeDeletionStart := time.Now()
deletedNodes, err := sd.scheduleDeleteEmptyNodes(emptyNodesToRemove, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups)
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
// TODO: Give the processor some information about the nodes that failed to be deleted.
scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes(deletedNodes, candidateNodeGroups, make(map[string][]*apiv1.Pod))
if len(deletedNodes) > 0 {
scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
} else {
scaleDownStatus.Result = status.ScaleDownError
var nodes []*apiv1.Node
for _, node := range emptyNodesToRemove {
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(sd.usageTracker, node.Node.Name, sd.unneededNodes)
nodes = append(nodes, node.Node)
}
if err != nil {
return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ")
}
return scaleDownStatus, nil
return nodes, nil, status.ScaleDownNodeDeleteStarted, nil
}
findNodesToRemoveStart := time.Now()
// We look for only 1 node so new hints may be incomplete.
nodesToRemove, unremovable, _, err := sd.removalSimulator.FindNodesToRemove(
candidateNames,
@ -772,67 +746,26 @@ func (sd *ScaleDown) TryToScaleDown(
for _, unremovableNode := range unremovable {
sd.unremovableNodes.Add(unremovableNode)
}
if err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err.AddPrefix("Find node to remove failed: ")
return nil, nil, status.ScaleDownError, err.AddPrefix("Find node to remove failed: ")
}
nodesToRemove = sd.processors.ScaleDownSetProcessor.GetNodesToRemove(sd.context, nodesToRemove, 1)
if len(nodesToRemove) == 0 {
klog.V(1).Infof("No node to remove")
scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
return scaleDownStatus, nil
return nil, nil, status.ScaleDownNoNodeDeleted, nil
}
toRemove := nodesToRemove[0]
utilization := sd.nodeUtilizationMap[toRemove.Node.Name]
podNames := make([]string, 0, len(toRemove.PodsToReschedule))
for _, pod := range toRemove.PodsToReschedule {
podNames = append(podNames, pod.Namespace+"/"+pod.Name)
}
klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", toRemove.Node.Name, utilization,
strings.Join(podNames, ","))
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: removing node %s, utilization: %v, pods to reschedule: %s",
toRemove.Node.Name, utilization, strings.Join(podNames, ","))
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
nodeDeletionStart := time.Now()
// Starting deletion.
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
if !found {
return scaleDownStatus, errors.NewAutoscalerError(errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)
}
sd.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), toRemove.Node.Name)
go func() {
// Finishing the delete process once this goroutine is over.
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), toRemove.Node.Name, result) }()
result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
if result.ResultType != status.NodeDeleteOk {
klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err)
return
}
if readinessMap[toRemove.Node.Name] {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Underutilized)
} else {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Unready)
}
}()
scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes([]*apiv1.Node{toRemove.Node}, candidateNodeGroups, map[string][]*apiv1.Pod{toRemove.Node.Name: toRemove.PodsToReschedule})
scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
return scaleDownStatus, nil
return nil, []*apiv1.Node{toRemove.Node}, status.ScaleDownNodeDeleteStarted, nil
}
// updateScaleDownMetrics registers duration of different parts of scale down.
// Separates time spent on finding nodes to remove, deleting nodes and other operations.
func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration *time.Duration, nodeDeletionDuration *time.Duration) {
func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration *time.Duration) {
stop := time.Now()
miscDuration := stop.Sub(scaleDownStart) - *nodeDeletionDuration - *findNodesToRemoveDuration
metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, *nodeDeletionDuration)
miscDuration := stop.Sub(scaleDownStart) - *findNodesToRemoveDuration
metrics.UpdateDuration(metrics.ScaleDownFindNodesToRemove, *findNodesToRemoveDuration)
metrics.UpdateDuration(metrics.ScaleDownMiscOperations, miscDuration)
}
@ -903,113 +836,6 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits
return nodesToRemove
}
func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodesToRemove []simulator.NodeToBeRemoved, client kube_client.Interface,
recorder kube_record.EventRecorder, readinessMap map[string]bool,
candidateNodeGroups map[string]cloudprovider.NodeGroup) ([]*apiv1.Node, errors.AutoscalerError) {
deletedNodes := []*apiv1.Node{}
for _, empty := range emptyNodesToRemove {
klog.V(0).Infof("Scale-down: removing empty node %s", empty.Node.Name)
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", empty.Node.Name)
simulator.RemoveNodeFromTracker(sd.usageTracker, empty.Node.Name, sd.unneededNodes)
nodeGroup, found := candidateNodeGroups[empty.Node.Name]
if !found {
return deletedNodes, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s", empty.Node.Name)
}
taintErr := deletetaint.MarkToBeDeleted(empty.Node, client, sd.context.CordonNodeBeforeTerminate)
if taintErr != nil {
recorder.Eventf(empty.Node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr)
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
}
deletedNodes = append(deletedNodes, empty.Node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name)
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name, result) }()
var deleteErr errors.AutoscalerError
// If we fail to delete the node we want to remove delete taint
defer func() {
if deleteErr != nil {
deletetaint.CleanToBeDeleted(nodeToDelete, client, sd.context.CordonNodeBeforeTerminate)
recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", deleteErr)
} else {
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
}
}()
if err := actuation.EvictDaemonSetPods(sd.context, nodeToDelete, time.Now(), actuation.DaemonSetEvictionEmptyNodeTimeout, actuation.DeamonSetTimeBetweenEvictionRetries); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
deleteErr = actuation.WaitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout)
if deleteErr != nil {
klog.Errorf("Problem with empty node deletion: %v", deleteErr)
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: deleteErr}
return
}
deleteErr = actuation.DeleteNodeFromCloudProvider(sd.context, nodeToDelete, sd.clusterStateRegistry)
if deleteErr != nil {
klog.Errorf("Problem with empty node deletion: %v", deleteErr)
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: deleteErr}
return
}
if readinessMap[nodeToDelete.Name] {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Empty)
} else {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Unready)
}
result = status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}(empty.Node, nodeGroup, sd.context.DaemonSetEvictionForEmptyNodes)
}
return deletedNodes, nil
}
func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod,
nodeGroup cloudprovider.NodeGroup) status.NodeDeleteResult {
deleteSuccessful := false
drainSuccessful := false
if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate); err != nil {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
}
// If we fail to evict all the pods from the node we want to remove delete taint
defer func() {
if !deleteSuccessful {
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate)
if !drainSuccessful {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown")
} else {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete the node")
}
}
}()
sd.context.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "marked the node as toBeDeleted/unschedulable")
daemonSetPods = daemonset.PodsToEvict(daemonSetPods, sd.context.DaemonSetEvictionForOccupiedNodes)
// attempt drain
evictionResults, err := actuation.DrainNodeWithPods(sd.context, node, pods, daemonSetPods, actuation.EvictionRetryTime, actuation.PodEvictionHeadroom)
if err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
}
drainSuccessful = true
if typedErr := actuation.WaitForDelayDeletion(node, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout); typedErr != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: typedErr}
}
// attempt delete from cloud provider
if typedErr := actuation.DeleteNodeFromCloudProvider(sd.context, node, sd.clusterStateRegistry); typedErr != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: typedErr}
}
deleteSuccessful = true // Let the deferred function know there is no need to cleanup
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}
func hasNoScaleDownAnnotation(node *apiv1.Node) bool {
return node.Annotations[ScaleDownDisabledKey] == "true"
}

View File

@ -29,7 +29,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -37,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
@ -143,7 +143,8 @@ func TestFindUnneededNodes(t *testing.T) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
allNodes := []*apiv1.Node{n1, n2, n3, n4, n5, n7, n8, n9}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3, p4, p5, p6})
@ -276,7 +277,8 @@ func TestFindUnneededGPUNodes(t *testing.T) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
allNodes := []*apiv1.Node{n1, n2, n3}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3})
@ -390,7 +392,8 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, allPods)
ng1 := provider.GetNodeGroup("n1").(*testprovider.TestNodeGroup)
@ -471,7 +474,8 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
allNodes := []*apiv1.Node{n1, n2, n3, n4}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3, p4})
@ -536,7 +540,8 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, pods)
autoscalererr = sd.UpdateUnneededNodes(nodes, nodes, time.Now(), nil)
@ -616,7 +621,9 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, pods)
autoscalererr = sd.UpdateUnneededNodes(nodes, nodes, time.Now(), nil)
assert.NoError(t, autoscalererr)
@ -671,176 +678,14 @@ func TestFindUnneededNodePool(t *testing.T) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
sd := wrapper.sd
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, pods)
autoscalererr = sd.UpdateUnneededNodes(nodes, nodes, time.Now(), nil)
assert.NoError(t, autoscalererr)
assert.NotEmpty(t, sd.unneededNodes)
}
func TestDeleteNode(t *testing.T) {
// common parameters
nodeDeleteFailedFunc :=
func(string, string) error {
return fmt.Errorf("won't remove node")
}
podNotFoundFunc :=
func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
}
// scenarios
testScenarios := []struct {
name string
pods []string
drainSuccess bool
nodeDeleteSuccess bool
expectedDeletion bool
expectedResultType status.NodeDeleteResultType
}{
{
name: "successful attempt to delete node with pods",
pods: []string{"p1", "p2"},
drainSuccess: true,
nodeDeleteSuccess: true,
expectedDeletion: true,
expectedResultType: status.NodeDeleteOk,
},
/* Temporarily disabled as it takes several minutes due to hardcoded timeout.
* TODO(aleksandra-malinowska): move MaxPodEvictionTime to AutoscalingContext.
{
name: "failed on drain",
pods: []string{"p1", "p2"},
drainSuccess: false,
nodeDeleteSuccess: true,
expectedDeletion: false,
expectedResultType: status.NodeDeleteErrorFailedToEvictPods,
},
*/
{
name: "failed on node delete",
pods: []string{"p1", "p2"},
drainSuccess: true,
nodeDeleteSuccess: false,
expectedDeletion: false,
expectedResultType: status.NodeDeleteErrorFailedToDelete,
},
{
name: "successful attempt to delete empty node",
pods: []string{},
drainSuccess: true,
nodeDeleteSuccess: true,
expectedDeletion: true,
expectedResultType: status.NodeDeleteOk,
},
{
name: "failed attempt to delete empty node",
pods: []string{},
drainSuccess: true,
nodeDeleteSuccess: false,
expectedDeletion: false,
expectedResultType: status.NodeDeleteErrorFailedToDelete,
},
}
for _, scenario := range testScenarios {
// run each scenario as an independent test
t.Run(scenario.name, func(t *testing.T) {
// set up test channels
updatedNodes := make(chan string, 10)
deletedNodes := make(chan string, 10)
deletedPods := make(chan string, 10)
// set up test data
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
pods := make([]*apiv1.Pod, len(scenario.pods))
for i, podName := range scenario.pods {
pod := BuildTestPod(podName, 100, 0)
pods[i] = pod
}
// set up fake provider
deleteNodeHandler := nodeDeleteFailedFunc
if scenario.nodeDeleteSuccess {
deleteNodeHandler =
func(nodeGroup string, node string) error {
deletedNodes <- node
return nil
}
}
provider := testprovider.NewTestCloudProvider(nil, deleteNodeHandler)
provider.AddNodeGroup("ng1", 1, 100, 100)
provider.AddNode("ng1", n1)
// set up fake client
fakeClient := &fake.Clientset{}
fakeNode := n1.DeepCopy()
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, fakeNode.DeepCopy(), nil
})
fakeClient.Fake.AddReactor("update", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
taints := make([]string, 0, len(obj.Spec.Taints))
for _, taint := range obj.Spec.Taints {
taints = append(taints, taint.Key)
}
updatedNodes <- fmt.Sprintf("%s-%s", obj.Name, taints)
fakeNode = obj.DeepCopy()
return true, obj, nil
})
fakeClient.Fake.AddReactor("create", "pods",
func(action core.Action) (bool, runtime.Object, error) {
if !scenario.drainSuccess {
return true, nil, fmt.Errorf("won't evict")
}
createAction := action.(core.CreateAction)
if createAction == nil {
return false, nil, nil
}
eviction := createAction.GetObject().(*policyv1.Eviction)
if eviction == nil {
return false, nil, nil
}
deletedPods <- eviction.Name
return true, nil, nil
})
fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc)
// build context
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
sd := newScaleDownForTesting(&context, clusterStateRegistry)
// attempt delete
result := sd.deleteNode(n1, pods, []*apiv1.Pod{}, provider.GetNodeGroup("ng1"))
// verify
if scenario.expectedDeletion {
assert.NoError(t, result.Err)
assert.Equal(t, n1.Name, utils.GetStringFromChanImmediately(deletedNodes))
} else {
assert.NotNil(t, result.Err)
}
assert.Equal(t, utils.NothingReturned, utils.GetStringFromChanImmediately(deletedNodes))
assert.Equal(t, scenario.expectedResultType, result.ResultType)
taintedUpdate := fmt.Sprintf("%s-%s", n1.Name, []string{deletetaint.ToBeDeletedTaint})
assert.Equal(t, taintedUpdate, utils.GetStringFromChan(updatedNodes))
if !scenario.expectedDeletion {
untaintedUpdate := fmt.Sprintf("%s-%s", n1.Name, []string{})
assert.Equal(t, untaintedUpdate, utils.GetStringFromChanImmediately(updatedNodes))
}
assert.Equal(t, utils.NothingReturned, utils.GetStringFromChanImmediately(updatedNodes))
})
}
}
func TestScaleDown(t *testing.T) {
var autoscalererr autoscaler_errors.AutoscalerError
@ -921,21 +766,22 @@ func TestScaleDown(t *testing.T) {
nodes := []*apiv1.Node{n1, n2}
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
scaleDown := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p1, p2})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil)
waitForDeleteToFinish(t, scaleDown)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain, time.Now())
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
assert.Equal(t, n1.Name, utils.GetStringFromChan(updatedNodes))
}
func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
func waitForDeleteToFinish(t *testing.T, wrapper *ScaleDownWrapper) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
_, drained := sd.nodeDeletionTracker.DeletionsInProgress()
_, drained := wrapper.CheckStatus().DeletionsInProgress()
if len(drained) == 0 {
return
}
@ -1174,15 +1020,12 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
scaleDown := newScaleDownForTesting(&context, clusterStateRegistry)
if config.NodeDeletionTracker != nil {
scaleDown.nodeDeletionTracker = config.NodeDeletionTracker
}
wrapper := newWrapperForTesting(&context, clusterStateRegistry, config.NodeDeletionTracker)
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain, time.Now())
assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
@ -1212,7 +1055,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
assert.Equal(t, expectedScaleDownCount, len(deleted))
assert.Subset(t, config.ExpectedScaleDowns, deleted)
_, nonEmptyDeletions := scaleDown.nodeDeletionTracker.DeletionsInProgress()
_, nonEmptyDeletions := wrapper.CheckStatus().DeletionsInProgress()
assert.Equal(t, 0, len(nonEmptyDeletions))
}
@ -1267,12 +1110,13 @@ func TestNoScaleDownUnready(t *testing.T) {
// N1 is unready so it requires a bigger unneeded time.
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
scaleDown := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p2})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil)
waitForDeleteToFinish(t, scaleDown)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain, time.Now())
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
@ -1290,12 +1134,13 @@ func TestNoScaleDownUnready(t *testing.T) {
// N1 has been unready for 2 hours, ok to delete.
context.CloudProvider = provider
scaleDown = newScaleDownForTesting(&context, clusterStateRegistry)
wrapper = newWrapperForTesting(&context, clusterStateRegistry, nil)
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p2})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-2*time.Hour), nil)
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, nil, time.Now().Add(-2*time.Hour))
assert.NoError(t, autoscalererr)
scaleDownStatus, err = scaleDown.TryToScaleDown(time.Now(), nil)
waitForDeleteToFinish(t, scaleDown)
empty, drain = wrapper.NodesToDelete(time.Now())
scaleDownStatus, err = wrapper.StartDeletion(empty, drain, time.Now())
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
@ -1379,12 +1224,13 @@ func TestScaleDownNoMove(t *testing.T) {
nodes := []*apiv1.Node{n1, n2}
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
scaleDown := newScaleDownForTesting(&context, clusterStateRegistry)
wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil)
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p1, p2})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil)
waitForDeleteToFinish(t, scaleDown)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain, time.Now())
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
@ -1551,6 +1397,13 @@ func generateReplicaSets() []*appsv1.ReplicaSet {
}
}
func newScaleDownForTesting(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry) *ScaleDown {
return NewScaleDown(context, NewTestProcessors(), clusterStateRegistry)
func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDownWrapper {
ctx.MaxDrainParallelism = 1
ctx.MaxScaleDownParallelism = 10
if ndt == nil {
ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt)
return NewScaleDownWrapper(sd, actuator)
}

View File

@ -20,6 +20,7 @@ import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
@ -32,14 +33,18 @@ import (
// ScaleDownWrapper wraps legacy scaledown logic to satisfy scaledown.Planner &
// scaledown.Actuator interfaces.
type ScaleDownWrapper struct {
sd *ScaleDown
pdbs []*policyv1.PodDisruptionBudget
sd *ScaleDown
pdbs []*policyv1.PodDisruptionBudget
actuator *actuation.Actuator
lastNodesToDeleteResult status.ScaleDownResult
lastNodesToDeleteErr errors.AutoscalerError
}
// NewScaleDownWrapper returns a new ScaleDownWrapper
func NewScaleDownWrapper(sd *ScaleDown) *ScaleDownWrapper {
func NewScaleDownWrapper(sd *ScaleDown, actuator *actuation.Actuator) *ScaleDownWrapper {
return &ScaleDownWrapper{
sd: sd,
sd: sd,
actuator: actuator,
}
}
@ -55,14 +60,6 @@ func (p *ScaleDownWrapper) CleanUpUnneededNodes() {
p.sd.CleanUpUnneededNodes()
}
// NodesToDelete lists nodes to delete. Current implementation is a no-op, the
// wrapper leverages shared state instead.
// TODO(x13n): Implement this and get rid of sharing state between planning and
// actuation.
func (p *ScaleDownWrapper) NodesToDelete() (empty, needDrain []*apiv1.Node) {
return nil, nil
}
// UnneededNodes returns a list of unneeded nodes.
func (p *ScaleDownWrapper) UnneededNodes() []*apiv1.Node {
return p.sd.UnneededNodes()
@ -79,20 +76,37 @@ func (p *ScaleDownWrapper) NodeUtilizationMap() map[string]utilization.Info {
return p.sd.NodeUtilizationMap()
}
// NodesToDelete lists nodes to delete.
//
// The legacy implementation had one status for getting nodes to delete and actually deleting them, so some of
// status.Result values are specific to NodesToDelete. In order not to break the processors that might be depending
// on these values, the Result is still passed between NodesToDelete and StartDeletion. The legacy implementation would
// also short-circuit in case of any errors, while current NodesToDelete doesn't return an error. To preserve that behavior,
// the error returned by legacy TryToScaleDown (now called NodesToDelete) is also passed to StartDeletion.
// TODO: Evaluate if we can get rid of the last bits of shared state.
func (p *ScaleDownWrapper) NodesToDelete(currentTime time.Time) (empty, needDrain []*apiv1.Node) {
empty, drain, result, err := p.sd.NodesToDelete(currentTime, p.pdbs)
p.lastNodesToDeleteResult = result
p.lastNodesToDeleteErr = err
return empty, drain
}
// StartDeletion triggers an actual scale down logic.
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) {
return p.sd.TryToScaleDown(currentTime, p.pdbs)
// Done to preserve legacy behavior, see comment on NodesToDelete.
if p.lastNodesToDeleteErr != nil || p.lastNodesToDeleteResult != status.ScaleDownNodeDeleteStarted {
return &status.ScaleDownStatus{Result: p.lastNodesToDeleteResult}, p.lastNodesToDeleteErr
}
return p.actuator.StartDeletion(empty, needDrain, currentTime)
}
// CheckStatus snapshots current deletion status
func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
// TODO: snapshot information from the tracker instead of keeping live
// updated object.
return p.sd.nodeDeletionTracker
return p.actuator.CheckStatus()
}
// ClearResultsNotNewerThan clears old node deletion results kept by the
// Actuator.
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
p.sd.nodeDeletionTracker.ClearResultsNotNewerThan(t)
p.actuator.ClearResultsNotNewerThan(t)
}

View File

@ -36,7 +36,7 @@ type Planner interface {
CleanUpUnneededNodes()
// NodesToDelete returns a list of nodes that can be deleted right now,
// according to the Planner.
NodesToDelete() (empty, needDrain []*apiv1.Node)
NodesToDelete(currentTime time.Time) (empty, needDrain []*apiv1.Node)
// UnneededNodes returns a list of nodes that either can be deleted
// right now or in a near future, assuming nothing will change in the
// cluster.

View File

@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
@ -153,8 +154,10 @@ func NewStaticAutoscaler(
clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry)
scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown)
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt)
scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator)
processorCallbacks.scaleDownPlanner = scaleDownWrapper
// Set the initial scale times to be less than the start time so as to
@ -549,7 +552,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
empty, needDrain := a.scaleDownPlanner.NodesToDelete()
empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime)
scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain, currentTime)
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)

View File

@ -31,6 +31,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
@ -145,6 +147,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Now())
@ -164,7 +167,9 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
return onScaleDownMock.ScaleDown(id, name)
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
},
nil, nil,
nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni})
@ -271,7 +276,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -302,7 +307,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -321,6 +326,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
onNodeGroupDeleteMock := &onNodeGroupDeleteMock{}
nodeGroupManager := &MockAutoprovisioningNodeGroupManager{t, 0}
nodeGroupListProcessor := &MockAutoprovisioningNodeGroupListProcessor{t}
deleteFinished := make(chan bool, 1)
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
@ -348,7 +354,9 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
return onScaleDownMock.ScaleDown(id, name)
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
}, func(id string) error {
return onNodeGroupCreateMock.Create(id)
}, func(id string) error {
@ -457,7 +465,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
onScaleDownMock.On("ScaleDown", "autoprovisioned-TN2", "n2").Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(2 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -472,6 +480,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)
now := time.Now()
later := now.Add(1 * time.Minute)
@ -489,7 +498,9 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
return onScaleDownMock.ScaleDown(id, name)
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
})
provider.AddNodeGroup("ng1", 2, 10, 2)
provider.AddNode("ng1", n1)
@ -535,7 +546,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
// broken node detected as unregistered
nodes := []*apiv1.Node{n1}
//nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
// nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState.UpdateNodes(nodes, nil, now)
// broken node failed to register in time
@ -582,7 +593,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
err = autoscaler.RunOnce(later.Add(2 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -597,6 +608,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
@ -642,7 +654,9 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
return onScaleDownMock.ScaleDown(id, name)
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
})
provider.AddNodeGroup("ng1", 0, 10, 1)
provider.AddNodeGroup("ng2", 0, 10, 2)
@ -737,7 +751,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
p4.Spec.NodeName = "n2"
err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDownActuator)
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
@ -1329,19 +1343,21 @@ func nodeNames(ns []*apiv1.Node) []string {
return names
}
func waitForDeleteToFinish(t *testing.T, sda scaledown.Actuator) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
_, dip := sda.CheckStatus().DeletionsInProgress()
klog.Infof("Non empty deletions in progress: %v", dip)
if len(dip) == 0 {
return
}
func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
select {
case <-deleteFinished:
return
case <-time.After(20 * time.Second):
t.Fatalf("Node delete not finished")
}
t.Fatalf("Node delete not finished")
}
func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) {
sd := legacy.NewScaleDown(ctx, p, cs)
wrapper := legacy.NewScaleDownWrapper(sd)
ctx.MaxScaleDownParallelism = 10
ctx.MaxDrainParallelism = 1
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
sd := legacy.NewScaleDown(ctx, p, cs, ndt)
actuator := actuation.NewActuator(ctx, cs, ndt)
wrapper := legacy.NewScaleDownWrapper(sd, actuator)
return wrapper, wrapper
}