diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 8fffeac1c1..65d8f4bf0f 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -197,4 +197,13 @@ type AutoscalingOptions struct { MaxNodeGroupBinpackingDuration time.Duration // NodeDeletionBatcherInterval is a time for how long CA ScaleDown gather nodes to delete them in batch. NodeDeletionBatcherInterval time.Duration + // SkipNodesWithSystemPods tells if nodes with pods from kube-system should be deleted (except for DaemonSet or mirror pods) + SkipNodesWithSystemPods bool + // SkipNodesWithLocalStorage tells if nodes with pods with local storage, e.g. EmptyDir or HostPath, should be deleted + SkipNodesWithLocalStorage bool + // MinReplicaCount controls the minimum number of replicas that a replica set or replication controller should have + // to allow their pods deletion in scale down + MinReplicaCount int + // NodeDeleteDelayAfterTaint is the duration to wait before deleting a node after tainting it + NodeDeleteDelayAfterTaint time.Duration } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 4958223bc9..f791c8b7d7 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -22,6 +22,7 @@ import ( "time" apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/klog/v2" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" @@ -29,12 +30,16 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/kubernetes/pkg/scheduler/framework" ) // Actuator is responsible for draining and deleting nodes. @@ -44,17 +49,19 @@ type Actuator struct { nodeDeletionTracker *deletiontracker.NodeDeletionTracker nodeDeletionBatcher *NodeDeletionBatcher evictor Evictor + deleteOptions simulator.NodeDeleteOptions } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, batchInterval time.Duration) *Actuator { - nbd := NewNodeDeletionBatcher(ctx, csr, ndr, batchInterval) +func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *Actuator { + nbd := NewNodeDeletionBatcher(ctx, csr, ndr, ctx.NodeDeletionBatcherInterval) return &Actuator{ ctx: ctx, clusterState: csr, nodeDeletionTracker: ndr, nodeDeletionBatcher: nbd, - evictor: NewDefaultEvictor(), + evictor: NewDefaultEvictor(deleteOptions), + deleteOptions: deleteOptions, } } @@ -83,26 +90,33 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node, currentTime time.Ti return scaleDownStatus, nil } - // Taint empty nodes synchronously, and immediately start deletions asynchronously. Because these nodes are empty, there's no risk that a pod from one - // to-be-deleted node gets recreated on another. - emptyScaledDown, err := a.taintSyncDeleteAsyncEmpty(emptyToDelete) - scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...) - if err != nil { - scaleDownStatus.Result = status.ScaleDownError - return scaleDownStatus, err + if len(emptyToDelete) > 0 { + // Taint all empty nodes synchronously + if err := a.taintNodesSync(emptyToDelete); err != nil { + scaleDownStatus.Result = status.ScaleDownError + return scaleDownStatus, err + } + + emptyScaledDown, err := a.deleteAsyncEmpty(emptyToDelete) + scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...) + if err != nil { + scaleDownStatus.Result = status.ScaleDownError + return scaleDownStatus, err + } } - // Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node - // could get recreated on another. - err = a.taintNodesSync(drainToDelete) - if err != nil { - scaleDownStatus.Result = status.ScaleDownError - return scaleDownStatus, err - } + if len(drainToDelete) > 0 { + // Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node + // could get recreated on another. + if err := a.taintNodesSync(drainToDelete); err != nil { + scaleDownStatus.Result = status.ScaleDownError + return scaleDownStatus, err + } - // All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously. - drainScaledDown := a.deleteAsyncDrain(drainToDelete) - scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...) + // All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously. + drainScaledDown := a.deleteAsyncDrain(drainToDelete) + scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...) + } scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted return scaleDownStatus, nil @@ -136,10 +150,12 @@ func (a *Actuator) cropNodesToBudgets(empty, needDrain []*apiv1.Node) ([]*apiv1. return emptyToDelete, drainToDelete } -// taintSyncDeleteAsyncEmpty synchronously taints the provided empty nodes, and immediately starts deletions asynchronously. +// deleteAsyncEmpty immediately starts deletions asynchronously. // scaledDownNodes return value contains all nodes for which deletion successfully started. It's valid and should be consumed // even if err != nil. -func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNodes []*status.ScaleDownNode, err errors.AutoscalerError) { +func (a *Actuator) deleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNodes []*status.ScaleDownNode, err errors.AutoscalerError) { + var groupIds []string + var validNodes []*apiv1.Node for _, emptyNode := range empty { klog.V(0).Infof("Scale-down: removing empty node %q", emptyNode.Name) a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", emptyNode.Name) @@ -150,20 +166,19 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod continue } - err = a.taintNode(emptyNode) - if err != nil { - a.ctx.Recorder.Eventf(emptyNode, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) - return scaledDownNodes, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", emptyNode.Name) - } - if sdNode, err := a.scaleDownNodeToReport(emptyNode, false); err == nil { scaledDownNodes = append(scaledDownNodes, sdNode) } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } + a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name) - go a.scheduleDeletion(emptyNode, nodeGroup.Id(), false) + groupIds = append(groupIds, nodeGroup.Id()) + validNodes = append(validNodes, emptyNode) } + + go a.deleteNodesAsync(validNodes, groupIds, false) + return scaledDownNodes, nil } @@ -189,6 +204,8 @@ func (a *Actuator) taintNodesSync(nodes []*apiv1.Node) errors.AutoscalerError { // deleteAsyncDrain asynchronously starts deletions with drain for all provided nodes. scaledDownNodes return value contains all nodes for which // deletion successfully started. func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*status.ScaleDownNode) { + var groupIds []string + var validNodes []*apiv1.Node for _, drainNode := range drain { if sdNode, err := a.scaleDownNodeToReport(drainNode, true); err == nil { klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods)) @@ -197,17 +214,89 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } + nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode) if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err) continue } + a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name) - go a.scheduleDeletion(drainNode, nodeGroup.Id(), true) + groupIds = append(groupIds, nodeGroup.Id()) + validNodes = append(validNodes, drainNode) } + + go a.deleteNodesAsync(validNodes, groupIds, true) + return scaledDownNodes } +func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, groupIds []string, drain bool) { + var pdbs []*policyv1.PodDisruptionBudget + var registry kube_util.ListerRegistry + + if len(nodes) == 0 { + return + } + + if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) { + klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint) + time.Sleep(a.ctx.NodeDeleteDelayAfterTaint) + } + + clusterSnapshot, err := a.createSnapshot(nodes) + + if err != nil { + klog.Errorf("Scale-down: couldn't create delete snapshot, err: %v", err) + nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "createSnapshot returned error %v", err)} + for i, node := range nodes { + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to create delete snapshot", nodeDeleteResult) + } + return + } + + if drain { + pdbs, err = a.ctx.PodDisruptionBudgetLister().List() + if err != nil { + klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err) + nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)} + for i, node := range nodes { + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to fetch pod disruption budgets", nodeDeleteResult) + } + return + } + + registry = a.ctx.ListerRegistry + } + + for i, node := range nodes { + nodeInfo, err := clusterSnapshot.NodeInfos().Get(node.Name) + if err != nil { + klog.Errorf("Scale-down: can't retrieve node %q from snapshot, err: %v", node.Name, err) + nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)} + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to get node info", nodeDeleteResult) + continue + } + + podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, pdbs, time.Now()) + if err != nil { + klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err) + nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)} + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to get pods to move on node", nodeDeleteResult) + continue + } + + if !drain && len(podsToRemove) != 0 { + klog.Errorf("Scale-down: couldn't delete empty node %q, new pods got scheduled", node.Name) + nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "failed to delete empty node %q, new pods scheduled", node.Name)} + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "node is not empty", nodeDeleteResult) + continue + } + + go a.scheduleDeletion(nodeInfo, groupIds[i], drain) + } +} + func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.ScaleDownNode, error) { nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node) if err != nil { @@ -223,10 +312,7 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status. } var evictedPods []*apiv1.Pod if drain { - _, nonDsPodsToEvict, err := podsToEvict(a.ctx, node.Name) - if err != nil { - return nil, err - } + _, nonDsPodsToEvict := podsToEvict(a.ctx, nodeInfo) evictedPods = nonDsPodsToEvict } return &status.ScaleDownNode{ @@ -247,13 +333,14 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { return nil } -func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult { +func (a *Actuator) prepareNodeForDeletion(nodeInfo *framework.NodeInfo, drain bool) status.NodeDeleteResult { + node := nodeInfo.Node() if drain { - if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil { + if evictionResults, err := a.evictor.DrainNode(a.ctx, nodeInfo); err != nil { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults} } } else { - if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil { + if err := a.evictor.EvictDaemonSetPods(a.ctx, nodeInfo, time.Now()); err != nil { // Evicting DS pods is best-effort, so proceed with the deletion even if there are errors. klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err) } @@ -265,8 +352,9 @@ func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.N } // scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted. -func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain bool) { - nodeDeleteResult := a.prepareNodeForDeletion(node, drain) +func (a *Actuator) scheduleDeletion(nodeInfo *framework.NodeInfo, nodeGroupId string, drain bool) { + node := nodeInfo.Node() + nodeDeleteResult := a.prepareNodeForDeletion(nodeInfo, drain) if nodeDeleteResult.Err != nil { CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "prepareNodeForDeletion failed", nodeDeleteResult) return @@ -279,6 +367,36 @@ func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain } } +func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (simulator.ClusterSnapshot, error) { + knownNodes := make(map[string]bool) + snapshot := simulator.NewBasicClusterSnapshot() + + scheduledPods, err := a.ctx.ScheduledPodLister().List() + if err != nil { + return nil, err + } + + nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) + + for _, node := range nodes { + if err := snapshot.AddNode(node); err != nil { + return nil, err + } + + knownNodes[node.Name] = true + } + + for _, pod := range nonExpendableScheduledPods { + if knownNodes[pod.Spec.NodeName] { + if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { + return nil, err + } + } + } + + return snapshot, nil +} + func min(x, y int) int { if x <= y { return x diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index a465bd3735..cdd949681f 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -24,8 +24,9 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - + appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -41,6 +42,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -219,10 +221,17 @@ func TestCropNodesToBudgets(t *testing.T) { t.Run(tn, func(t *testing.T) { ctx := &context.AutoscalingContext{ AutoscalingOptions: config.AutoscalingOptions{ - MaxScaleDownParallelism: 10, - MaxDrainParallelism: 5, + MaxScaleDownParallelism: 10, + MaxDrainParallelism: 5, + NodeDeletionBatcherInterval: 0 * time.Second, + NodeDeleteDelayAfterTaint: 1 * time.Second, }, } + deleteOptions := simulator.NodeDeleteOptions{ + SkipNodesWithSystemPods: true, + SkipNodesWithLocalStorage: true, + MinReplicaCount: 0, + } ndr := deletiontracker.NewNodeDeletionTracker(1 * time.Hour) for i := 0; i < tc.emptyDeletionsInProgress; i++ { ndr.StartDeletion("ng1", fmt.Sprintf("empty-node-%d", i)) @@ -230,7 +239,8 @@ func TestCropNodesToBudgets(t *testing.T) { for i := 0; i < tc.drainDeletionsInProgress; i++ { ndr.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i)) } - actuator := NewActuator(ctx, nil, ndr, 0*time.Second) + + actuator := NewActuator(ctx, nil, ndr, deleteOptions) gotEmpty, gotDrain := actuator.cropNodesToBudgets(tc.emptyNodes, tc.drainNodes) if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty()); diff != "" { t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff) @@ -303,8 +313,8 @@ func TestStartDeletion(t *testing.T) { "deletion with drain": { drainNodes: generateNodes(2, "drain"), pods: map[string][]*apiv1.Pod{ - "drain-node-0": generatePods(2, "drain-node-0"), - "drain-node-1": generatePods(2, "drain-node-1"), + "drain-node-0": removablePods(2, "drain-node-0"), + "drain-node-1": removablePods(2, "drain-node-1"), }, wantStatus: &status.ScaleDownStatus{ Result: status.ScaleDownNodeDeleteStarted, @@ -312,13 +322,13 @@ func TestStartDeletion(t *testing.T) { { Node: generateNode("drain-node-0"), NodeGroup: testNg, - EvictedPods: generatePods(2, "drain-node-0"), + EvictedPods: removablePods(2, "drain-node-0"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("drain-node-1"), NodeGroup: testNg, - EvictedPods: generatePods(2, "drain-node-1"), + EvictedPods: removablePods(2, "drain-node-1"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, }, @@ -342,8 +352,8 @@ func TestStartDeletion(t *testing.T) { emptyNodes: generateNodes(2, "empty"), drainNodes: generateNodes(2, "drain"), pods: map[string][]*apiv1.Pod{ - "drain-node-0": generatePods(2, "drain-node-0"), - "drain-node-1": generatePods(2, "drain-node-1"), + "drain-node-0": removablePods(2, "drain-node-0"), + "drain-node-1": removablePods(2, "drain-node-1"), }, wantStatus: &status.ScaleDownStatus{ Result: status.ScaleDownNodeDeleteStarted, @@ -363,13 +373,13 @@ func TestStartDeletion(t *testing.T) { { Node: generateNode("drain-node-0"), NodeGroup: testNg, - EvictedPods: generatePods(2, "drain-node-0"), + EvictedPods: removablePods(2, "drain-node-0"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("drain-node-1"), NodeGroup: testNg, - EvictedPods: generatePods(2, "drain-node-1"), + EvictedPods: removablePods(2, "drain-node-1"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, }, @@ -397,53 +407,37 @@ func TestStartDeletion(t *testing.T) { "drain-node-1": {ResultType: status.NodeDeleteOk}, }, }, - "failure to taint empty node stops further deletion": { + "failure to taint empty node stops deletion and cleans already applied taints": { emptyNodes: generateNodes(4, "empty"), drainNodes: generateNodes(1, "drain"), pods: map[string][]*apiv1.Pod{ - "drain-node-0": generatePods(2, "drain-node-0"), + "drain-node-0": removablePods(2, "drain-node-0"), }, failedNodeTaint: map[string]bool{"empty-node-2": true}, wantStatus: &status.ScaleDownStatus{ - Result: status.ScaleDownError, - ScaledDownNodes: []*status.ScaleDownNode{ - { - Node: generateNode("empty-node-0"), - NodeGroup: testNg, - EvictedPods: nil, - UtilInfo: generateUtilInfo(0, 0), - }, - { - Node: generateNode("empty-node-1"), - NodeGroup: testNg, - EvictedPods: nil, - UtilInfo: generateUtilInfo(0, 0), - }, - }, + Result: status.ScaleDownError, + ScaledDownNodes: nil, }, - wantDeletedNodes: []string{"empty-node-0", "empty-node-1"}, wantTaintUpdates: map[string][][]apiv1.Taint{ "empty-node-0": { {toBeDeletedTaint}, + {}, }, "empty-node-1": { {toBeDeletedTaint}, + {}, }, }, - wantNodeDeleteResults: map[string]status.NodeDeleteResult{ - "empty-node-0": {ResultType: status.NodeDeleteOk}, - "empty-node-1": {ResultType: status.NodeDeleteOk}, - }, wantErr: cmpopts.AnyError, }, "failure to taint drain node stops further deletion and cleans already applied taints": { emptyNodes: generateNodes(2, "empty"), drainNodes: generateNodes(4, "drain"), pods: map[string][]*apiv1.Pod{ - "drain-node-0": generatePods(2, "drain-node-0"), - "drain-node-1": generatePods(2, "drain-node-1"), - "drain-node-2": generatePods(2, "drain-node-2"), - "drain-node-3": generatePods(2, "drain-node-3"), + "drain-node-0": removablePods(2, "drain-node-0"), + "drain-node-1": removablePods(2, "drain-node-1"), + "drain-node-2": removablePods(2, "drain-node-2"), + "drain-node-3": removablePods(2, "drain-node-3"), }, failedNodeTaint: map[string]bool{"drain-node-2": true}, wantStatus: &status.ScaleDownStatus{ @@ -471,14 +465,6 @@ func TestStartDeletion(t *testing.T) { "empty-node-1": { {toBeDeletedTaint}, }, - "drain-node-0": { - {toBeDeletedTaint}, - {}, - }, - "drain-node-1": { - {toBeDeletedTaint}, - {}, - }, }, wantNodeDeleteResults: map[string]status.NodeDeleteResult{ "empty-node-0": {ResultType: status.NodeDeleteOk}, @@ -489,10 +475,10 @@ func TestStartDeletion(t *testing.T) { "nodes that failed drain are correctly reported in results": { drainNodes: generateNodes(4, "drain"), pods: map[string][]*apiv1.Pod{ - "drain-node-0": generatePods(3, "drain-node-0"), - "drain-node-1": generatePods(3, "drain-node-1"), - "drain-node-2": generatePods(3, "drain-node-2"), - "drain-node-3": generatePods(3, "drain-node-3"), + "drain-node-0": removablePods(3, "drain-node-0"), + "drain-node-1": removablePods(3, "drain-node-1"), + "drain-node-2": removablePods(3, "drain-node-2"), + "drain-node-3": removablePods(3, "drain-node-3"), }, failedPodDrain: map[string]bool{ "drain-node-0-pod-0": true, @@ -505,25 +491,25 @@ func TestStartDeletion(t *testing.T) { { Node: generateNode("drain-node-0"), NodeGroup: testNg, - EvictedPods: generatePods(3, "drain-node-0"), + EvictedPods: removablePods(3, "drain-node-0"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, { Node: generateNode("drain-node-1"), NodeGroup: testNg, - EvictedPods: generatePods(3, "drain-node-1"), + EvictedPods: removablePods(3, "drain-node-1"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, { Node: generateNode("drain-node-2"), NodeGroup: testNg, - EvictedPods: generatePods(3, "drain-node-2"), + EvictedPods: removablePods(3, "drain-node-2"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, { Node: generateNode("drain-node-3"), NodeGroup: testNg, - EvictedPods: generatePods(3, "drain-node-3"), + EvictedPods: removablePods(3, "drain-node-3"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, }, @@ -556,9 +542,9 @@ func TestStartDeletion(t *testing.T) { ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: cmpopts.AnyError, PodEvictionResults: map[string]status.PodEvictionResult{ - "drain-node-0-pod-0": {Pod: generatePod("drain-node-0-pod-0"), Err: cmpopts.AnyError, TimedOut: true}, - "drain-node-0-pod-1": {Pod: generatePod("drain-node-0-pod-1"), Err: cmpopts.AnyError, TimedOut: true}, - "drain-node-0-pod-2": {Pod: generatePod("drain-node-0-pod-2")}, + "drain-node-0-pod-0": {Pod: removablePod("drain-node-0-pod-0", "drain-node-0"), Err: cmpopts.AnyError, TimedOut: true}, + "drain-node-0-pod-1": {Pod: removablePod("drain-node-0-pod-1", "drain-node-0"), Err: cmpopts.AnyError, TimedOut: true}, + "drain-node-0-pod-2": {Pod: removablePod("drain-node-0-pod-2", "drain-node-0")}, }, }, "drain-node-1": {ResultType: status.NodeDeleteOk}, @@ -566,9 +552,9 @@ func TestStartDeletion(t *testing.T) { ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: cmpopts.AnyError, PodEvictionResults: map[string]status.PodEvictionResult{ - "drain-node-2-pod-0": {Pod: generatePod("drain-node-2-pod-0")}, - "drain-node-2-pod-1": {Pod: generatePod("drain-node-2-pod-1"), Err: cmpopts.AnyError, TimedOut: true}, - "drain-node-2-pod-2": {Pod: generatePod("drain-node-2-pod-2")}, + "drain-node-2-pod-0": {Pod: removablePod("drain-node-2-pod-0", "drain-node-2")}, + "drain-node-2-pod-1": {Pod: removablePod("drain-node-2-pod-1", "drain-node-2"), Err: cmpopts.AnyError, TimedOut: true}, + "drain-node-2-pod-2": {Pod: removablePod("drain-node-2-pod-2", "drain-node-2")}, }, }, "drain-node-3": {ResultType: status.NodeDeleteOk}, @@ -578,8 +564,8 @@ func TestStartDeletion(t *testing.T) { emptyNodes: generateNodes(2, "empty"), drainNodes: generateNodes(2, "drain"), pods: map[string][]*apiv1.Pod{ - "drain-node-0": generatePods(2, "drain-node-0"), - "drain-node-1": generatePods(2, "drain-node-1"), + "drain-node-0": removablePods(2, "drain-node-0"), + "drain-node-1": removablePods(2, "drain-node-1"), }, failedNodeDeletion: map[string]bool{ "empty-node-1": true, @@ -603,13 +589,13 @@ func TestStartDeletion(t *testing.T) { { Node: generateNode("drain-node-0"), NodeGroup: testNg, - EvictedPods: generatePods(2, "drain-node-0"), + EvictedPods: removablePods(2, "drain-node-0"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("drain-node-1"), NodeGroup: testNg, - EvictedPods: generatePods(2, "drain-node-1"), + EvictedPods: removablePods(2, "drain-node-1"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, }, @@ -645,8 +631,8 @@ func TestStartDeletion(t *testing.T) { "DS pods are evicted from empty nodes, but don't block deletion on error": { emptyNodes: generateNodes(2, "empty"), pods: map[string][]*apiv1.Pod{ - "empty-node-0": {generateDsPod("empty-node-0-ds-pod-0"), generateDsPod("empty-node-0-ds-pod-1")}, - "empty-node-1": {generateDsPod("empty-node-1-ds-pod-0"), generateDsPod("empty-node-1-ds-pod-1")}, + "empty-node-0": {generateDsPod("empty-node-0-ds-pod-0", "empty-node-0"), generateDsPod("empty-node-0-ds-pod-1", "empty-node-0")}, + "empty-node-1": {generateDsPod("empty-node-1-ds-pod-0", "empty-node-1"), generateDsPod("empty-node-1-ds-pod-1", "empty-node-1")}, }, failedPodDrain: map[string]bool{"empty-node-1-ds-pod-0": true}, wantStatus: &status.ScaleDownStatus{ @@ -681,11 +667,11 @@ func TestStartDeletion(t *testing.T) { "empty-node-1": {ResultType: status.NodeDeleteOk}, }, }, - "pods are not evicted from nodes with pods if the node is passed as empty": { + "nodes with pods are not deleted if the node is passed as empty": { emptyNodes: generateNodes(2, "empty-but-with-pods"), pods: map[string][]*apiv1.Pod{ - "empty-but-with-pods-node-0": generatePods(2, "empty-but-with-pods-node--0"), - "empty-but-with-pods-node-1": generatePods(2, "empty-but-with-pods-node--1"), + "empty-but-with-pods-node-0": removablePods(2, "empty-but-with-pods-node-0"), + "empty-but-with-pods-node-1": removablePods(2, "empty-but-with-pods-node-1"), }, wantStatus: &status.ScaleDownStatus{ Result: status.ScaleDownNodeDeleteStarted, @@ -704,19 +690,21 @@ func TestStartDeletion(t *testing.T) { }, }, }, - wantDeletedNodes: []string{"empty-but-with-pods-node-0", "empty-but-with-pods-node-1"}, + wantDeletedNodes: nil, wantDeletedPods: nil, wantTaintUpdates: map[string][][]apiv1.Taint{ "empty-but-with-pods-node-0": { {toBeDeletedTaint}, + {}, }, "empty-but-with-pods-node-1": { {toBeDeletedTaint}, + {}, }, }, wantNodeDeleteResults: map[string]status.NodeDeleteResult{ - "empty-but-with-pods-node-0": {ResultType: status.NodeDeleteOk}, - "empty-but-with-pods-node-1": {ResultType: status.NodeDeleteOk}, + "empty-but-with-pods-node-0": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError}, + "empty-but-with-pods-node-1": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError}, }, }, } { @@ -744,6 +732,8 @@ func TestStartDeletion(t *testing.T) { deletedNodes := make(chan string, 10) deletedPods := make(chan string, 10) + ds := generateDaemonSet() + // We're faking the whole k8s client, and some of the code needs to get live nodes and pods, so GET on nodes and pods has to be set up. fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { nodesLock.Lock() @@ -819,7 +809,21 @@ func TestStartDeletion(t *testing.T) { MaxPodEvictionTime: 0, DaemonSetEvictionForEmptyNodes: true, } - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + + allPods := []*apiv1.Pod{} + + for _, pods := range tc.pods { + allPods = append(allPods, pods...) + } + + podLister := kube_util.NewTestPodLister(allPods) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{ds}) + if err != nil { + t.Fatalf("Couldn't create daemonset lister") + } + + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, dsLister, nil, nil, nil, nil) ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil) if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) @@ -1046,6 +1050,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { "test-ng-2": testNg2, "test-ng-3": testNg3, } + for ngName, numNodes := range test.numNodesToDelete { ng := testNg[ngName] provider.InsertNodeGroup(ng) @@ -1064,7 +1069,10 @@ func TestStartDeletionInBatchBasic(t *testing.T) { MaxPodEvictionTime: 0, DaemonSetEvictionForEmptyNodes: true, } - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, nil, nil) ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil) if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) @@ -1076,6 +1084,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { nodeDeletionBatcher: NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval), evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, } + for _, nodes := range deleteNodes { actuator.StartDeletion(nodes, []*apiv1.Node{}, time.Now()) time.Sleep(deleteInterval) @@ -1143,22 +1152,29 @@ func generateNode(name string) *apiv1.Node { } } -func generatePods(count int, prefix string) []*apiv1.Pod { +func removablePods(count int, prefix string) []*apiv1.Pod { var result []*apiv1.Pod for i := 0; i < count; i++ { name := fmt.Sprintf("pod-%d", i) if prefix != "" { name = prefix + "-" + name } - result = append(result, generatePod(name)) + result = append(result, removablePod(name, prefix)) } return result } -func generatePod(name string) *apiv1.Pod { +func removablePod(name string, node string) *apiv1.Pod { return &apiv1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: name}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + Annotations: map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "true", + }, + }, Spec: apiv1.PodSpec{ + NodeName: node, Containers: []apiv1.Container{ { Name: "test-container", @@ -1174,12 +1190,22 @@ func generatePod(name string) *apiv1.Pod { } } -func generateDsPod(name string) *apiv1.Pod { - pod := generatePod(name) - pod.OwnerReferences = GenerateOwnerReferences(name+"-ds", "DaemonSet", "apps/v1", "some-uid") +func generateDsPod(name string, node string) *apiv1.Pod { + pod := removablePod(name, node) + pod.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "some-uid") return pod } +func generateDaemonSet() *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds", + Namespace: "default", + SelfLink: "/apiv1s/apps/v1/namespaces/default/daemonsets/ds", + }, + } +} + func generateUtilInfo(cpuUtil, memUtil float64) utilization.Info { var higherUtilName apiv1.ResourceName var higherUtilVal float64 diff --git a/cluster-autoscaler/core/scaledown/actuation/drain.go b/cluster-autoscaler/core/scaledown/actuation/drain.go index 437bb739b3..ad4ac9d20c 100644 --- a/cluster-autoscaler/core/scaledown/actuation/drain.go +++ b/cluster-autoscaler/core/scaledown/actuation/drain.go @@ -36,6 +36,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" + "k8s.io/kubernetes/pkg/scheduler/framework" ) const ( @@ -56,27 +57,26 @@ type Evictor struct { DsEvictionRetryTime time.Duration DsEvictionEmptyNodeTimeout time.Duration PodEvictionHeadroom time.Duration + deleteOptions simulator.NodeDeleteOptions } // NewDefaultEvictor returns an instance of Evictor using the default parameters. -func NewDefaultEvictor() Evictor { +func NewDefaultEvictor(deleteOptions simulator.NodeDeleteOptions) Evictor { return Evictor{ EvictionRetryTime: DefaultEvictionRetryTime, DsEvictionRetryTime: DefaultDsEvictionRetryTime, DsEvictionEmptyNodeTimeout: DefaultDsEvictionEmptyNodeTimeout, PodEvictionHeadroom: DefaultPodEvictionHeadroom, + deleteOptions: deleteOptions, } } // DrainNode works like DrainNodeWithPods, but lists of pods to evict don't have to be provided. All non-mirror, non-DS pods on the // node are evicted. Mirror pods are not evicted. DaemonSet pods are evicted if DaemonSetEvictionForOccupiedNodes is enabled, or // if they have the EnableDsEvictionKey annotation. -func (e Evictor) DrainNode(ctx *acontext.AutoscalingContext, node *apiv1.Node) (map[string]status.PodEvictionResult, error) { - dsPodsToEvict, nonDsPodsToEvict, err := podsToEvict(ctx, node.Name) - if err != nil { - return nil, err - } - return e.DrainNodeWithPods(ctx, node, nonDsPodsToEvict, dsPodsToEvict) +func (e Evictor) DrainNode(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (map[string]status.PodEvictionResult, error) { + dsPodsToEvict, nonDsPodsToEvict := podsToEvict(ctx, nodeInfo) + return e.DrainNodeWithPods(ctx, nodeInfo.Node(), nonDsPodsToEvict, dsPodsToEvict) } // DrainNodeWithPods performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving @@ -169,12 +169,9 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1 } // EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node. -func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeToDelete *apiv1.Node, timeNow time.Time) error { - nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(nodeToDelete.Name) - if err != nil { - return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name) - } - _, daemonSetPods, _, err := simulator.FastGetPodsToMove(nodeInfo, true, true, []*policyv1.PodDisruptionBudget{}, timeNow) +func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error { + nodeToDelete := nodeInfo.Node() + _, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, []*policyv1.PodDisruptionBudget{}, timeNow) if err != nil { return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err) } @@ -245,11 +242,7 @@ func evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, isDaemonS return status.PodEvictionResult{Pod: podToEvict, TimedOut: true, Err: fmt.Errorf("failed to evict pod %s/%s within allowed timeout (last error: %v)", podToEvict.Namespace, podToEvict.Name, lastError)} } -func podsToEvict(ctx *acontext.AutoscalingContext, nodeName string) (dsPods, nonDsPods []*apiv1.Pod, err error) { - nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(nodeName) - if err != nil { - return nil, nil, err - } +func podsToEvict(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (dsPods, nonDsPods []*apiv1.Pod) { for _, podInfo := range nodeInfo.Pods { if pod_util.IsMirrorPod(podInfo.Pod) { continue @@ -260,5 +253,5 @@ func podsToEvict(ctx *acontext.AutoscalingContext, nodeName string) (dsPods, non } } dsPodsToEvict := daemonset.PodsToEvict(dsPods, ctx.DaemonSetEvictionForOccupiedNodes) - return dsPodsToEvict, nonDsPods, nil + return dsPodsToEvict, nonDsPods } diff --git a/cluster-autoscaler/core/scaledown/actuation/drain_test.go b/cluster-autoscaler/core/scaledown/actuation/drain_test.go index 7c1b024592..938a026eec 100644 --- a/cluster-autoscaler/core/scaledown/actuation/drain_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/drain_test.go @@ -51,7 +51,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { testScenarios := []struct { name string dsPods []string - nodeInfoSuccess bool evictionTimeoutExceed bool dsEvictionTimeout time.Duration evictionSuccess bool @@ -63,24 +62,13 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { { name: "Successful attempt to evict DaemonSet pods", dsPods: []string{"d1", "d2"}, - nodeInfoSuccess: true, dsEvictionTimeout: 5000 * time.Millisecond, evictionSuccess: true, evictByDefault: true, }, - { - name: "Failed to get node info", - dsPods: []string{"d1", "d2"}, - nodeInfoSuccess: false, - dsEvictionTimeout: 5000 * time.Millisecond, - evictionSuccess: true, - err: fmt.Errorf("failed to get node info"), - evictByDefault: true, - }, { name: "Failed to create DaemonSet eviction", dsPods: []string{"d1", "d2"}, - nodeInfoSuccess: true, dsEvictionTimeout: 5000 * time.Millisecond, evictionSuccess: false, err: fmt.Errorf("following DaemonSet pod failed to evict on the"), @@ -89,7 +77,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { { name: "Eviction timeout exceed", dsPods: []string{"d1", "d2", "d3"}, - nodeInfoSuccess: true, evictionTimeoutExceed: true, dsEvictionTimeout: 100 * time.Millisecond, evictionSuccess: true, @@ -99,7 +86,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { { name: "Evict single pod due to annotation", dsPods: []string{"d1", "d2"}, - nodeInfoSuccess: true, dsEvictionTimeout: 5000 * time.Millisecond, evictionSuccess: true, extraAnnotationValue: map[string]string{"d1": "true"}, @@ -108,7 +94,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { { name: "Don't evict single pod due to annotation", dsPods: []string{"d1", "d2"}, - nodeInfoSuccess: true, dsEvictionTimeout: 5000 * time.Millisecond, evictionSuccess: true, evictByDefault: true, @@ -172,17 +157,15 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) - if scenario.nodeInfoSuccess { - simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods) - } else { - simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{}) - } + simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods) evictor := Evictor{ DsEvictionEmptyNodeTimeout: scenario.dsEvictionTimeout, DsEvictionRetryTime: waitBetweenRetries, } - err = evictor.EvictDaemonSetPods(&context, n1, timeNow) + nodeInfo, err := context.ClusterSnapshot.NodeInfos().Get(n1.Name) + assert.NoError(t, err) + err = evictor.EvictDaemonSetPods(&context, nodeInfo, timeNow) if scenario.err != nil { assert.NotNil(t, err) assert.Contains(t, err.Error(), scenario.err.Error()) @@ -545,7 +528,6 @@ func TestPodsToEvict(t *testing.T) { dsEvictionDisabled bool wantDsPods []*apiv1.Pod wantNonDsPods []*apiv1.Pod - wantErr error }{ "no pods": { pods: []*apiv1.Pod{}, @@ -588,13 +570,6 @@ func TestPodsToEvict(t *testing.T) { wantDsPods: []*apiv1.Pod{dsPod("ds-pod-1", false), dsPod("ds-pod-2", false)}, wantNonDsPods: []*apiv1.Pod{regularPod("regular-pod-1"), regularPod("regular-pod-2")}, }, - "calling for an unknown node name is an error": { - pods: []*apiv1.Pod{ - regularPod("pod-1"), regularPod("pod-2"), - }, - nodeNameOverwrite: "unknown-node", - wantErr: cmpopts.AnyError, - }, } { t.Run(tn, func(t *testing.T) { snapshot := simulator.NewBasicClusterSnapshot() @@ -613,10 +588,11 @@ func TestPodsToEvict(t *testing.T) { if tc.nodeNameOverwrite != "" { nodeName = tc.nodeNameOverwrite } - gotDsPods, gotNonDsPods, err := podsToEvict(ctx, nodeName) - if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" { - t.Errorf("podsToEvict err diff (-want +got):\n%s", diff) + nodeInfo, err := snapshot.NodeInfos().Get(nodeName) + if err != nil { + t.Fatalf("NodeInfos().Get() unexpected error: %v", err) } + gotDsPods, gotNonDsPods := podsToEvict(ctx, nodeInfo) if diff := cmp.Diff(tc.wantDsPods, gotDsPods, cmpopts.EquateEmpty()); diff != "" { t.Errorf("podsToEvict dsPods diff (-want +got):\n%s", diff) } diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy.go b/cluster-autoscaler/core/scaledown/legacy/legacy.go index bd55519f5f..8f30946f99 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy.go @@ -61,9 +61,9 @@ type ScaleDown struct { } // NewScaleDown builds new ScaleDown object. -func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDown { +func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *ScaleDown { usageTracker := simulator.NewUsageTracker() - removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, false) + removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, false) unremovableNodes := unremovable.NewNodes() return &ScaleDown{ context: context, diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index 2640bf8858..9bc4dc9318 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -28,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -129,9 +130,12 @@ func TestFindUnneededNodes(t *testing.T) { provider.AddNode("ng1", n8) provider.AddNode("ng1", n9) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2, p3, p4, p5, p6}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ @@ -262,9 +266,12 @@ func TestFindUnneededGPUNodes(t *testing.T) { provider.AddNode("ng1", n2) provider.AddNode("ng1", n3) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2, p3}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ @@ -385,9 +392,12 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) { } for tn, tc := range cases { t.Run(tn, func(t *testing.T) { + podLister := kube_util.NewTestPodLister(allPods) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) @@ -461,9 +471,12 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) { provider.AddNode("ng1", n3) provider.AddNode("ng1", n4) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ @@ -524,9 +537,12 @@ func TestFindUnneededMaxCandidates(t *testing.T) { numCandidates := 30 + podLister := kube_util.NewTestPodLister(pods) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ @@ -605,9 +621,12 @@ func TestFindUnneededEmptyNodes(t *testing.T) { numCandidates := 30 + podLister := kube_util.NewTestPodLister(pods) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ @@ -662,9 +681,12 @@ func TestFindUnneededNodePool(t *testing.T) { numCandidates := 30 + podLister := kube_util.NewTestPodLister(pods) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ @@ -759,7 +781,10 @@ func TestScaleDown(t *testing.T) { } jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, jobLister, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) @@ -1015,7 +1040,10 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { assert.NotNil(t, provider) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(config.Options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) @@ -1102,7 +1130,11 @@ func TestNoScaleDownUnready(t *testing.T) { }, MaxGracefulTerminationSec: 60, } - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p2}) + pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{}) + + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) @@ -1267,10 +1299,18 @@ func generateReplicaSets() []*appsv1.ReplicaSet { func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDownWrapper { ctx.MaxDrainParallelism = 1 ctx.MaxScaleDownParallelism = 10 + ctx.NodeDeletionBatcherInterval = 0 * time.Second + ctx.NodeDeleteDelayAfterTaint = 0 * time.Second if ndt == nil { ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } - sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt) - actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, 0*time.Second) + + deleteOptions := simulator.NodeDeleteOptions{ + SkipNodesWithSystemPods: true, + SkipNodesWithLocalStorage: true, + MinReplicaCount: 0, + } + sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt, deleteOptions) + actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions) return NewScaleDownWrapper(sd, actuator) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 9f0a6b010a..ee4a01faf7 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -154,9 +154,15 @@ func NewStaticAutoscaler( clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff) + deleteOptions := simulator.NodeDeleteOptions{ + SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods, + SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage, + MinReplicaCount: opts.MinReplicaCount, + } + ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt) - actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, opts.NodeDeletionBatcherInterval) + scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt, deleteOptions) + actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions) scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator) processorCallbacks.scaleDownPlanner = scaleDownWrapper diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 1774691759..f6eade5caf 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -38,6 +38,7 @@ import ( core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -269,10 +270,10 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale down. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once() err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour)) @@ -456,9 +457,9 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Scale down. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupDeleteMock.On("Delete", "autoprovisioned-"+ "TN1").Return(nil).Once() @@ -743,10 +744,10 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { // Scale down. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Times(2) + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Times(3) unschedulablePodMock.On("List").Return([]*apiv1.Pod{p5}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() onScaleDownMock.On("ScaleDown", "ng1", "n1").Return(nil).Once() p4.Spec.NodeName = "n2" @@ -1398,9 +1399,16 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 + ctx.NodeDeletionBatcherInterval = 0 * time.Second + ctx.NodeDeleteDelayAfterTaint = 1 * time.Second + deleteOptions := simulator.NodeDeleteOptions{ + SkipNodesWithSystemPods: true, + SkipNodesWithLocalStorage: true, + MinReplicaCount: 0, + } ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - sd := legacy.NewScaleDown(ctx, p, cs, ndt) - actuator := actuation.NewActuator(ctx, cs, ndt, 0*time.Second) + sd := legacy.NewScaleDown(ctx, p, cs, ndt, deleteOptions) + actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions) wrapper := legacy.NewScaleDownWrapper(sd, actuator) return wrapper, wrapper } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 2d0c42abc8..ed95b2e320 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -206,6 +206,10 @@ var ( recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") + skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)") + skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") + minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down") + nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -297,6 +301,10 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxNodesPerScaleUp: *maxNodesPerScaleUp, MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, NodeDeletionBatcherInterval: *nodeDeletionBatcherInterval, + SkipNodesWithSystemPods: *skipNodesWithSystemPods, + SkipNodesWithLocalStorage: *skipNodesWithLocalStorage, + MinReplicaCount: *minReplicaCount, + NodeDeleteDelayAfterTaint: *nodeDeleteDelayAfterTaint, } } diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index ab3d0f38c3..e13ad78d6b 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -17,7 +17,6 @@ limitations under the License. package simulator import ( - "flag" "fmt" "time" @@ -33,17 +32,6 @@ import ( klog "k8s.io/klog/v2" ) -var ( - skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, - "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet "+ - "or mirror pods)") - skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, - "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") - - minReplicaCount = flag.Int("min-replica-count", 0, - "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down") -) - // NodeToBeRemoved contain information about a node that can be removed. type NodeToBeRemoved struct { // Node to be removed. @@ -102,16 +90,19 @@ type RemovalSimulator struct { predicateChecker PredicateChecker usageTracker *UsageTracker canPersist bool + deleteOptions NodeDeleteOptions } // NewRemovalSimulator returns a new RemovalSimulator. -func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, usageTracker *UsageTracker, persistSuccessfulSimulations bool) *RemovalSimulator { +func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, + usageTracker *UsageTracker, deleteOptions NodeDeleteOptions, persistSuccessfulSimulations bool) *RemovalSimulator { return &RemovalSimulator{ listers: listers, clusterSnapshot: clusterSnapshot, predicateChecker: predicateChecker, usageTracker: usageTracker, canPersist: persistSuccessfulSimulations, + deleteOptions: deleteOptions, } } @@ -166,8 +157,7 @@ func (r *RemovalSimulator) CheckNodeRemoval( return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError} } - podsToRemove, daemonSetPods, blockingPod, err := DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods, - *skipNodesWithLocalStorage, r.listers, int32(*minReplicaCount), pdbs, timestamp) + podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, pdbs, timestamp) if err != nil { klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err) if blockingPod != nil { @@ -200,8 +190,8 @@ func (r *RemovalSimulator) FindEmptyNodesToRemove(candidates []string, timestamp klog.Errorf("Can't retrieve node %s from snapshot, err: %v", node, err) continue } - // Should block on all pods. - podsToRemove, _, _, err := FastGetPodsToMove(nodeInfo, true, true, nil, timestamp) + // Should block on all pods + podsToRemove, _, _, err := GetPodsToMove(nodeInfo, r.deleteOptions, nil, nil, timestamp) if err == nil && len(podsToRemove) == 0 { result = append(result, node) } diff --git a/cluster-autoscaler/simulator/cluster_test.go b/cluster-autoscaler/simulator/cluster_test.go index 79057d068c..1e456afb6e 100644 --- a/cluster-autoscaler/simulator/cluster_test.go +++ b/cluster-autoscaler/simulator/cluster_test.go @@ -58,7 +58,7 @@ func TestFindPlaceAllOk(t *testing.T) { []*apiv1.Node{node1, node2}, []*apiv1.Pod{pod1}) - err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), false).findPlaceFor( + err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), testDeleteOptions(), false).findPlaceFor( "x", []*apiv1.Pod{new1, new2}, destinations, @@ -96,7 +96,7 @@ func TestFindPlaceAllBas(t *testing.T) { []*apiv1.Node{node1, node2}, []*apiv1.Pod{pod1}) - err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), false).findPlaceFor( + err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), testDeleteOptions(), false).findPlaceFor( "nbad", []*apiv1.Pod{new1, new2, new3}, destinations, @@ -129,7 +129,7 @@ func TestFindNone(t *testing.T) { []*apiv1.Node{node1, node2}, []*apiv1.Pod{pod1}) - err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), false).findPlaceFor( + err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), testDeleteOptions(), false).findPlaceFor( "x", []*apiv1.Pod{}, destinations, @@ -162,7 +162,7 @@ func TestFindEmptyNodes(t *testing.T) { clusterSnapshot := NewBasicClusterSnapshot() InitializeClusterSnapshotOrDie(t, clusterSnapshot, []*apiv1.Node{nodes[0], nodes[1], nodes[2], nodes[3]}, []*apiv1.Pod{pod1, pod2}) testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) - r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil, false) + r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil, testDeleteOptions(), false) emptyNodes := r.FindEmptyNodesToRemove(nodeNames, testTime) assert.Equal(t, []string{nodeNames[0], nodeNames[2], nodeNames[3]}, emptyNodes) } @@ -309,7 +309,7 @@ func TestFindNodesToRemove(t *testing.T) { destinations = append(destinations, node.Name) } InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods) - r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, false) + r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false) toRemove, unremovable, _, err := r.FindNodesToRemove( test.candidates, destinations, map[string]string{}, time.Now(), []*policyv1.PodDisruptionBudget{}) @@ -320,3 +320,11 @@ func TestFindNodesToRemove(t *testing.T) { }) } } + +func testDeleteOptions() NodeDeleteOptions { + return NodeDeleteOptions{ + SkipNodesWithSystemPods: true, + SkipNodesWithLocalStorage: true, + MinReplicaCount: 0, + } +} diff --git a/cluster-autoscaler/simulator/drain.go b/cluster-autoscaler/simulator/drain.go index cc57283858..d784c1a08b 100644 --- a/cluster-autoscaler/simulator/drain.go +++ b/cluster-autoscaler/simulator/drain.go @@ -29,44 +29,25 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) -// FastGetPodsToMove returns a list of pods that should be moved elsewhere -// and a list of DaemonSet pods that should be evicted if the node -// is drained. Raises error if there is an unreplicated pod. -// Based on kubectl drain code. It makes an assumption that RC, DS, Jobs and RS were deleted -// along with their pods (no abandoned pods with dangling created-by annotation). Useful for fast -// checks. -func FastGetPodsToMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool, - pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { - for _, podInfo := range nodeInfo.Pods { - pods = append(pods, podInfo.Pod) - } - pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain( - pods, - pdbs, - skipNodesWithSystemPods, - skipNodesWithLocalStorage, - false, - nil, - 0, - timestamp) - - if err != nil { - return pods, daemonSetPods, blockingPod, err - } - if pdbBlockingPod, err := checkPdbs(pods, pdbs); err != nil { - return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err - } - - return pods, daemonSetPods, nil, nil +// NodeDeleteOptions contains various options to customize how draining will behave +type NodeDeleteOptions struct { + // SkipNodesWithSystemPods tells if nodes with pods from kube-system should be deleted (except for DaemonSet or mirror pods) + SkipNodesWithSystemPods bool + // SkipNodesWithLocalStorage tells if nodes with pods with local storage, e.g. EmptyDir or HostPath, should be deleted + SkipNodesWithLocalStorage bool + // MinReplicaCount controls the minimum number of replicas that a replica set or replication controller should have + // to allow their pods deletion in scale down + MinReplicaCount int } -// DetailedGetPodsForMove returns a list of pods that should be moved elsewhere +// GetPodsToMove returns a list of pods that should be moved elsewhere // and a list of DaemonSet pods that should be evicted if the node // is drained. Raises error if there is an unreplicated pod. -// Based on kubectl drain code. It checks whether RC, DS, Jobs and RS that created these pods +// Based on kubectl drain code. If listers is nil it makes an assumption that RC, DS, Jobs and RS were deleted +// along with their pods (no abandoned pods with dangling created-by annotation). +// If listers is not nil it checks whether RC, DS, Jobs and RS that created these pods // still exist. -func DetailedGetPodsForMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSystemPods bool, - skipNodesWithLocalStorage bool, listers kube_util.ListerRegistry, minReplicaCount int32, +func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry, pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { for _, podInfo := range nodeInfo.Pods { pods = append(pods, podInfo.Pod) @@ -74,11 +55,10 @@ func DetailedGetPodsForMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWith pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain( pods, pdbs, - skipNodesWithSystemPods, - skipNodesWithLocalStorage, - true, + deleteOptions.SkipNodesWithSystemPods, + deleteOptions.SkipNodesWithLocalStorage, listers, - minReplicaCount, + int32(deleteOptions.MinReplicaCount), timestamp) if err != nil { return pods, daemonSetPods, blockingPod, err diff --git a/cluster-autoscaler/simulator/drain_test.go b/cluster-autoscaler/simulator/drain_test.go index 347b978f6c..46ebb7542a 100644 --- a/cluster-autoscaler/simulator/drain_test.go +++ b/cluster-autoscaler/simulator/drain_test.go @@ -32,7 +32,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFastGetPodsToMove(t *testing.T) { +func TestGetPodsToMove(t *testing.T) { testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) // Unreplicated pod pod1 := &apiv1.Pod{ @@ -41,7 +41,12 @@ func TestFastGetPodsToMove(t *testing.T) { Namespace: "ns", }, } - _, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod1), true, true, nil, testTime) + deleteOptions := NodeDeleteOptions{ + SkipNodesWithSystemPods: true, + SkipNodesWithLocalStorage: true, + MinReplicaCount: 0, + } + _, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod1), deleteOptions, nil, nil, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod1, Reason: drain.NotReplicated}, blockingPod) @@ -53,7 +58,7 @@ func TestFastGetPodsToMove(t *testing.T) { OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), }, } - r2, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2), true, true, nil, testTime) + r2, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod2), deleteOptions, nil, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r2)) @@ -69,7 +74,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, }, } - r3, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod3), true, true, nil, testTime) + r3, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod3), deleteOptions, nil, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 0, len(r3)) @@ -82,7 +87,7 @@ func TestFastGetPodsToMove(t *testing.T) { OwnerReferences: GenerateOwnerReferences("ds", "DaemonSet", "extensions/v1beta1", ""), }, } - r4, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), true, true, nil, testTime) + r4, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), deleteOptions, nil, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r4)) @@ -96,7 +101,7 @@ func TestFastGetPodsToMove(t *testing.T) { OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), }, } - _, _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod5), true, true, nil, testTime) + _, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod5), deleteOptions, nil, nil, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod5, Reason: drain.UnmovableKubeSystemPod}, blockingPod) @@ -117,7 +122,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, }, } - _, _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod6), true, true, nil, testTime) + _, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod6), deleteOptions, nil, nil, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod6, Reason: drain.LocalStorageRequested}, blockingPod) @@ -140,7 +145,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, }, } - r7, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod7), true, true, nil, testTime) + r7, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod7), deleteOptions, nil, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r7)) @@ -176,7 +181,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, } - _, _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod8), true, true, []*policyv1.PodDisruptionBudget{pdb8}, testTime) + _, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod8), deleteOptions, nil, []*policyv1.PodDisruptionBudget{pdb8}, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod8, Reason: drain.NotEnoughPdb}, blockingPod) @@ -210,7 +215,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, } - r9, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod9), true, true, []*policyv1.PodDisruptionBudget{pdb9}, testTime) + r9, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod9), deleteOptions, nil, []*policyv1.PodDisruptionBudget{pdb9}, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r9)) @@ -243,7 +248,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, } - r10SkipPodsThatShouldBeTerminatedTrue, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod10, pod10Terminated, pod10Terminating), true, true, nil, testTime) + r10SkipPodsThatShouldBeTerminatedTrue, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod10, pod10Terminated, pod10Terminating), deleteOptions, nil, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.ElementsMatch(t, []*apiv1.Pod{pod10, pod10Terminating}, r10SkipPodsThatShouldBeTerminatedTrue) diff --git a/cluster-autoscaler/utils/drain/drain.go b/cluster-autoscaler/utils/drain/drain.go index e1be056653..4dd8efc58f 100644 --- a/cluster-autoscaler/utils/drain/drain.go +++ b/cluster-autoscaler/utils/drain/drain.go @@ -78,13 +78,13 @@ func GetPodsForDeletionOnNodeDrain( pdbs []*policyv1.PodDisruptionBudget, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool, - checkReferences bool, // Setting this to true requires client to be not-null. listers kube_util.ListerRegistry, minReplica int32, currentTime time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *BlockingPod, err error) { pods = []*apiv1.Pod{} daemonSetPods = []*apiv1.Pod{} + checkReferences := listers != nil // filter kube-system PDBs to avoid doing it for every kube-system pod kubeSystemPDBs := make([]*policyv1.PodDisruptionBudget, 0) for _, pdb := range pdbs { diff --git a/cluster-autoscaler/utils/drain/drain_test.go b/cluster-autoscaler/utils/drain/drain_test.go index a02f198f0b..f6e5de7a76 100644 --- a/cluster-autoscaler/utils/drain/drain_test.go +++ b/cluster-autoscaler/utils/drain/drain_test.go @@ -646,7 +646,7 @@ func TestDrain(t *testing.T) { registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister) - pods, daemonSetPods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, true, registry, 0, testTime) + pods, daemonSetPods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, registry, 0, testTime) if test.expectFatal { assert.Equal(t, test.expectBlockingPod, blockingPod) diff --git a/cluster-autoscaler/utils/kubernetes/testlisters.go b/cluster-autoscaler/utils/kubernetes/testlisters.go index db79bcfa0a..571298484a 100644 --- a/cluster-autoscaler/utils/kubernetes/testlisters.go +++ b/cluster-autoscaler/utils/kubernetes/testlisters.go @@ -22,6 +22,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" v1appslister "k8s.io/client-go/listers/apps/v1" v1batchlister "k8s.io/client-go/listers/batch/v1" v1lister "k8s.io/client-go/listers/core/v1" @@ -43,6 +44,21 @@ func NewTestPodLister(pods []*apiv1.Pod) PodLister { return TestPodLister{pods: pods} } +// TestPodDisruptionBudgetLister is used in tests involving listers +type TestPodDisruptionBudgetLister struct { + pdbs []*policyv1.PodDisruptionBudget +} + +// List returns all pdbs in test lister. +func (lister TestPodDisruptionBudgetLister) List() ([]*policyv1.PodDisruptionBudget, error) { + return lister.pdbs, nil +} + +// NewTestPodDisruptionBudgetLister returns a lister that returns provided pod disruption budgets +func NewTestPodDisruptionBudgetLister(pdbs []*policyv1.PodDisruptionBudget) PodDisruptionBudgetLister { + return TestPodDisruptionBudgetLister{pdbs: pdbs} +} + // TestNodeLister is used in tests involving listers type TestNodeLister struct { nodes []*apiv1.Node