diff --git a/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go b/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go index 2756d723b9..0a6fa6f749 100644 --- a/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go +++ b/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go @@ -24,6 +24,7 @@ import ( ) // NodeDeletionTracker keeps track of node deletions. +// TODO: extend to implement ActuationStatus interface type NodeDeletionTracker struct { sync.Mutex nonEmptyNodeDeleteInProgress bool diff --git a/cluster-autoscaler/core/scaledown/legacy/wrapper.go b/cluster-autoscaler/core/scaledown/legacy/wrapper.go new file mode 100644 index 0000000000..eef2b85a45 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/legacy/wrapper.go @@ -0,0 +1,137 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package legacy + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + + apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1beta1" +) + +// ScaleDownWrapper wraps legacy scaledown logic to satisfy scaledown.Planner & +// scaledown.Actuator interfaces. +type ScaleDownWrapper struct { + sd *ScaleDown + pdbs []*policyv1.PodDisruptionBudget +} + +// NewScaleDownWrapper returns a new ScaleDownWrapper +func NewScaleDownWrapper(sd *ScaleDown) *ScaleDownWrapper { + return &ScaleDownWrapper{ + sd: sd, + } +} + +// UpdateClusterState updates unneeded nodes in the underlying ScaleDown. +func (p *ScaleDownWrapper) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, actuationStatus scaledown.ActuationStatus, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError { + p.sd.CleanUp(currentTime) + p.pdbs = pdbs + return p.sd.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs) +} + +// CleanUpUnneededNodes cleans up unneeded nodes. +func (p *ScaleDownWrapper) CleanUpUnneededNodes() { + p.sd.CleanUpUnneededNodes() +} + +// NodesToDelete lists nodes to delete. Current implementation is a no-op, the +// wrapper leverages shared state instead. +// TODO(x13n): Implement this and get rid of sharing state between planning and +// actuation. +func (p *ScaleDownWrapper) NodesToDelete() (empty, needDrain []*apiv1.Node) { + return nil, nil +} + +// UnneededNodes returns a list of unneeded nodes. +func (p *ScaleDownWrapper) UnneededNodes() []*apiv1.Node { + return p.sd.UnneededNodes() +} + +// UnremovableNodes returns a list of nodes that cannot be removed. +func (p *ScaleDownWrapper) UnremovableNodes() []*simulator.UnremovableNode { + return p.sd.UnremovableNodes() +} + +// NodeUtilizationMap returns information about utilization of individual +// cluster nodes. +func (p *ScaleDownWrapper) NodeUtilizationMap() map[string]utilization.Info { + return p.sd.NodeUtilizationMap() +} + +// StartDeletion triggers an actual scale down logic. +func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) { + return p.sd.TryToScaleDown(currentTime, p.pdbs) +} + +// CheckStatus snapshots current deletion status +func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus { + // TODO: snapshot information from the tracker instead of keeping live + // updated object. + return &actuationStatus{ + ndt: p.sd.nodeDeletionTracker, + } +} + +// ClearResultsNotNewerThan clears old node deletion results kept by the +// Actuator. +func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) { + // TODO: implement this once results are not cleared while being + // fetched. +} + +type actuationStatus struct { + ndt *deletiontracker.NodeDeletionTracker +} + +// DeletionsInProgress returns node names of currently deleted nodes. +// Current implementation is not aware of the actual nodes names, so it returns +// a fake node name instead. +// TODO: Return real node names +func (a *actuationStatus) DeletionsInProgress() []string { + if a.ndt.IsNonEmptyNodeDeleteInProgress() { + return []string{"fake-node-name"} + } + return nil +} + +// DeletionsCount returns total number of ongoing deletions in a given node +// group. +func (a *actuationStatus) DeletionsCount(nodeGroupId string) int { + return a.ndt.GetDeletionsInProgress(nodeGroupId) +} + +// RecentEvictions should return a list of recently evicted pods. Since legacy +// scale down logic only drains at most one node at a time, this safeguard is +// not really needed there, so we can just return an empty list. +func (a *actuationStatus) RecentEvictions() []*apiv1.Pod { + return nil +} + +// DeletionResults returns a map of recent node deletion results. +func (a *actuationStatus) DeletionResults() map[string]status.NodeDeleteResult { + // TODO: update nodeDeletionTracker so it doesn't get & clear in the + // same step. + return a.ndt.GetAndClearNodeDeleteResults() +} diff --git a/cluster-autoscaler/core/scaledown/scaledown.go b/cluster-autoscaler/core/scaledown/scaledown.go new file mode 100644 index 0000000000..291e70db9c --- /dev/null +++ b/cluster-autoscaler/core/scaledown/scaledown.go @@ -0,0 +1,84 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaledown + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + + apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1beta1" +) + +// Planner is responsible for selecting nodes that should be removed. +type Planner interface { + // UpdateClusterState provides the Planner with information about the cluster. + UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError + // CleanUpUnneededNodes resets internal state of the Planner. + CleanUpUnneededNodes() + // NodesToDelete returns a list of nodes that can be deleted right now, + // according to the Planner. + NodesToDelete() (empty, needDrain []*apiv1.Node) + // UnneededNodes returns a list of nodes that either can be deleted + // right now or in a near future, assuming nothing will change in the + // cluster. + UnneededNodes() []*apiv1.Node + // UnremovableNodes returns a list of nodes that cannot be removed. + // TODO(x13n): Add a guarantee that each node is either unneeded or + // unremovable. This is not guaranteed by the current implementation. + UnremovableNodes() []*simulator.UnremovableNode + // NodeUtilizationMap returns information about utilization of + // individual cluster nodes. + NodeUtilizationMap() map[string]utilization.Info +} + +// Actuator is responsible for making changes in the cluster: draining and +// deleting nodes. +type Actuator interface { + // StartDeletion triggers a new deletion process. Nodes passed to this + // function are not guaranteed to be deleted, it is possible for the + // Actuator to ignore some of them e.g. if max configured level of + // parallelism is reached. + StartDeletion(empty, needDrain []*apiv1.Node, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) + // CheckStatus returns an immutable snapshot of ongoing deletions. + CheckStatus() ActuationStatus + // ClearResultsNotNewerThan removes information about deletions finished + // before or exactly at the provided timestamp. + ClearResultsNotNewerThan(time.Time) +} + +// ActuationStatus is used for feeding Actuator status back into Planner +type ActuationStatus interface { + // DeletionsInProgress returns a list of nodes that are currently + // undergoing deletion. + DeletionsInProgress() (nodeNames []string) + // DeletionsCount returns total number of ongoing deletions in a given + // node group. + DeletionsCount(nodeGroupId string) int + // RecentEvictions returns a list of pods that were recently removed by + // the Actuator and hence are likely to get recreated elsewhere in the + // cluster. + RecentEvictions() (pods []*apiv1.Pod) + // DeletionResults returns a map of recent node deletion results, keyed + // by the node name. Note: if node deletion was scheduled more than + // once, only the latest result will be present. + DeletionResults() map[string]status.NodeDeleteResult +} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 8df28058c5..934a39975b 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" @@ -71,7 +72,8 @@ type StaticAutoscaler struct { lastScaleUpTime time.Time lastScaleDownDeleteTime time.Time lastScaleDownFailTime time.Time - scaleDown *legacy.ScaleDown + scaleDownPlanner scaledown.Planner + scaleDownActuator scaledown.Actuator processors *ca_processors.AutoscalingProcessors processorCallbacks *staticAutoscalerProcessorCallbacks initialized bool @@ -81,11 +83,11 @@ type StaticAutoscaler struct { type staticAutoscalerProcessorCallbacks struct { disableScaleDownForLoop bool extraValues map[string]interface{} - scaleDown *legacy.ScaleDown + scaleDownPlanner scaledown.Planner } func (callbacks *staticAutoscalerProcessorCallbacks) ResetUnneededNodes() { - callbacks.scaleDown.CleanUpUnneededNodes() + callbacks.scaleDownPlanner.CleanUpUnneededNodes() } func newStaticAutoscalerProcessorCallbacks() *staticAutoscalerProcessorCallbacks { @@ -152,7 +154,8 @@ func NewStaticAutoscaler( clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff) scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry) - processorCallbacks.scaleDown = scaleDown + scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown) + processorCallbacks.scaleDownPlanner = scaleDownWrapper // Set the initial scale times to be less than the start time so as to // not start in cooldown mode. @@ -162,7 +165,8 @@ func NewStaticAutoscaler( lastScaleUpTime: initialScaleTime, lastScaleDownDeleteTime: initialScaleTime, lastScaleDownFailTime: initialScaleTime, - scaleDown: scaleDown, + scaleDownPlanner: scaleDownWrapper, + scaleDownActuator: scaleDownWrapper, processors: processors, processorCallbacks: processorCallbacks, clusterStateRegistry: clusterStateRegistry, @@ -231,7 +235,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError unschedulablePodLister := a.UnschedulablePodLister() scheduledPodLister := a.ScheduledPodLister() pdbLister := a.PodDisruptionBudgetLister() - scaleDown := a.scaleDown autoscalingContext := a.AutoscalingContext klog.V(4).Info("Starting main loop") @@ -326,7 +329,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus) } if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { - scaleDownStatus.SetUnremovableNodesInfo(scaleDown.UnremovableNodes(), scaleDown.NodeUtilizationMap(), a.CloudProvider) + scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider) a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus) } @@ -354,7 +357,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError if !a.clusterStateRegistry.IsClusterHealthy() { klog.Warning("Cluster is not ready for autoscaling") - scaleDown.CleanUpUnneededNodes() + a.scaleDownPlanner.CleanUpUnneededNodes() autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "ClusterUnhealthy", "Cluster is unhealthy") return nil } @@ -480,8 +483,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError klog.V(4).Infof("Calculating unneeded nodes") - scaleDown.CleanUp(currentTime) - var scaleDownCandidates []*apiv1.Node var podDestinations []*apiv1.Node @@ -509,9 +510,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError } } - // We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors - // (e.g unscheduled pods with nominated node name) can block scaledown of given node. - if typedErr := scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs); typedErr != nil { + actuationStatus := a.scaleDownActuator.CheckStatus() + if typedErr := a.scaleDownPlanner.UpdateClusterState(podDestinations, scaleDownCandidates, actuationStatus, pdbs, currentTime); typedErr != nil { scaleDownStatus.Result = status.ScaleDownError klog.Errorf("Failed to scale down: %v", typedErr) return typedErr @@ -523,20 +523,22 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) + // TODO(x13n): Move deletionsInProgress > 0 condition to the legacy scaledown implementation. + deletionsInProgress := len(actuationStatus.DeletionsInProgress()) // In dry run only utilization is updated - calculateUnneededOnly := scaleDownInCooldown || scaleDown.IsNonEmptyNodeDeleteInProgress() + calculateUnneededOnly := scaleDownInCooldown || deletionsInProgress > 0 klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+ "lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v "+ - "isDeleteInProgress=%v scaleDownInCooldown=%v", + "deletionsInProgress=%v scaleDownInCooldown=%v", calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop, - scaleDown.IsNonEmptyNodeDeleteInProgress(), scaleDownInCooldown) + deletionsInProgress, scaleDownInCooldown) metrics.UpdateScaleDownInCooldown(scaleDownInCooldown) if scaleDownInCooldown { scaleDownStatus.Result = status.ScaleDownInCooldown - } else if scaleDown.IsNonEmptyNodeDeleteInProgress() { + } else if deletionsInProgress > 0 { scaleDownStatus.Result = status.ScaleDownInProgress } else { klog.V(4).Infof("Starting scale down") @@ -550,9 +552,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError scaleDownStart := time.Now() metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart) - scaleDownStatus, typedErr := scaleDown.TryToScaleDown(currentTime, pdbs) + empty, needDrain := a.scaleDownPlanner.NodesToDelete() + scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain, currentTime) metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart) - metrics.UpdateUnremovableNodesCount(scaleDown.UnremovableNodesCount()) + metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes())) scaleDownStatus.RemovedNodeGroups = removedNodeGroups @@ -564,13 +567,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError if (scaleDownStatus.Result == status.ScaleDownNoNodeDeleted || scaleDownStatus.Result == status.ScaleDownNoUnneeded) && a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 { - taintableNodes := a.scaleDown.UnneededNodes() + taintableNodes := a.scaleDownPlanner.UnneededNodes() untaintableNodes := subtractNodes(allNodes, taintableNodes) actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes) } if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { - scaleDownStatus.SetUnremovableNodesInfo(scaleDown.UnremovableNodes(), scaleDown.NodeUtilizationMap(), a.CloudProvider) + scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider) a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus) scaleDownStatusProcessorAlreadyCalled = true } @@ -762,7 +765,7 @@ func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosF err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime) if err != nil { klog.Errorf("Failed to update node registry: %v", err) - a.scaleDown.CleanUpUnneededNodes() + a.scaleDownPlanner.CleanUpUnneededNodes() return errors.ToAutoscalerError(errors.CloudProviderError, err) } core_utils.UpdateClusterStateMetrics(a.clusterStateRegistry) @@ -820,6 +823,16 @@ func calculateCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, return coresTotal, memoryTotal } +func countsByReason(nodes []*simulator.UnremovableNode) map[simulator.UnremovableReason]int { + counts := make(map[simulator.UnremovableReason]int) + + for _, node := range nodes { + counts[node.Reason]++ + } + + return counts +} + func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node { var c []*apiv1.Node namesToDrop := make(map[string]bool) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index b042368dc0..67c7878dc1 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -30,10 +30,12 @@ import ( clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy" . "k8s.io/autoscaler/cluster-autoscaler/core/test" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -202,14 +204,15 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { processors := NewTestProcessors() clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) - sd := legacy.NewScaleDown(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, processors: processors, processorCallbacks: processorCallbacks, initialized: true, @@ -268,7 +271,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once() err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour)) - waitForDeleteToFinish(t, autoscaler.scaleDown) + waitForDeleteToFinish(t, autoscaler.scaleDownActuator) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) @@ -299,7 +302,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) - waitForDeleteToFinish(t, autoscaler.scaleDown) + waitForDeleteToFinish(t, autoscaler.scaleDownActuator) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) @@ -394,14 +397,15 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) - sd := legacy.NewScaleDown(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, processors: processors, processorCallbacks: processorCallbacks, initialized: true, @@ -453,7 +457,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { onScaleDownMock.On("ScaleDown", "autoprovisioned-TN2", "n2").Return(nil).Once() err = autoscaler.RunOnce(time.Now().Add(2 * time.Hour)) - waitForDeleteToFinish(t, autoscaler.scaleDown) + waitForDeleteToFinish(t, autoscaler.scaleDownActuator) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) @@ -539,14 +543,15 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { processors := NewTestProcessors() - sd := legacy.NewScaleDown(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, processors: processors, processorCallbacks: processorCallbacks, } @@ -577,7 +582,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() err = autoscaler.RunOnce(later.Add(2 * time.Hour)) - waitForDeleteToFinish(t, autoscaler.scaleDown) + waitForDeleteToFinish(t, autoscaler.scaleDownActuator) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) @@ -679,14 +684,15 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := NewTestProcessors() clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) - sd := legacy.NewScaleDown(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, processors: processors, processorCallbacks: processorCallbacks, } @@ -731,7 +737,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { p4.Spec.NodeName = "n2" err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour)) - waitForDeleteToFinish(t, autoscaler.scaleDown) + waitForDeleteToFinish(t, autoscaler.scaleDownActuator) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) @@ -807,14 +813,15 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := NewTestProcessors() clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) - sd := legacy.NewScaleDown(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, processors: processors, processorCallbacks: processorCallbacks, } @@ -903,14 +910,15 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := NewTestProcessors() clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) - sd := legacy.NewScaleDown(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, processors: processors, processorCallbacks: processorCallbacks, } @@ -1321,11 +1329,17 @@ func nodeNames(ns []*apiv1.Node) []string { return names } -func waitForDeleteToFinish(t *testing.T, sd *legacy.ScaleDown) { +func waitForDeleteToFinish(t *testing.T, sda scaledown.Actuator) { for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) { - if !sd.IsNonEmptyNodeDeleteInProgress() { + if len(sda.CheckStatus().DeletionsInProgress()) == 0 { return } } t.Fatalf("Node delete not finished") } + +func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { + sd := legacy.NewScaleDown(ctx, p, cs) + wrapper := legacy.NewScaleDownWrapper(sd) + return wrapper, wrapper +}