Add option to wait for a period of time after node tainting/cordoning

Node state is refreshed and checked again before deleting the node
It gives kube-scheduler time to acknowledge that nodes state has
changed and to stop scheduling pods on them
This commit is contained in:
Alexandru Matei 2022-10-06 11:09:30 +03:00
parent c65a3a3cf5
commit 0ee2a359e7
17 changed files with 450 additions and 267 deletions

View File

@ -197,4 +197,13 @@ type AutoscalingOptions struct {
MaxNodeGroupBinpackingDuration time.Duration
// NodeDeletionBatcherInterval is a time for how long CA ScaleDown gather nodes to delete them in batch.
NodeDeletionBatcherInterval time.Duration
// SkipNodesWithSystemPods tells if nodes with pods from kube-system should be deleted (except for DaemonSet or mirror pods)
SkipNodesWithSystemPods bool
// SkipNodesWithLocalStorage tells if nodes with pods with local storage, e.g. EmptyDir or HostPath, should be deleted
SkipNodesWithLocalStorage bool
// MinReplicaCount controls the minimum number of replicas that a replica set or replication controller should have
// to allow their pods deletion in scale down
MinReplicaCount int
// NodeDeleteDelayAfterTaint is the duration to wait before deleting a node after tainting it
NodeDeleteDelayAfterTaint time.Duration
}

View File

@ -22,6 +22,7 @@ import (
"time"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/klog/v2"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -29,12 +30,16 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// Actuator is responsible for draining and deleting nodes.
@ -44,17 +49,19 @@ type Actuator struct {
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionBatcher *NodeDeletionBatcher
evictor Evictor
deleteOptions simulator.NodeDeleteOptions
}
// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, batchInterval time.Duration) *Actuator {
nbd := NewNodeDeletionBatcher(ctx, csr, ndr, batchInterval)
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *Actuator {
nbd := NewNodeDeletionBatcher(ctx, csr, ndr, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndr,
nodeDeletionBatcher: nbd,
evictor: NewDefaultEvictor(),
evictor: NewDefaultEvictor(deleteOptions),
deleteOptions: deleteOptions,
}
}
@ -83,26 +90,33 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node, currentTime time.Ti
return scaleDownStatus, nil
}
// Taint empty nodes synchronously, and immediately start deletions asynchronously. Because these nodes are empty, there's no risk that a pod from one
// to-be-deleted node gets recreated on another.
emptyScaledDown, err := a.taintSyncDeleteAsyncEmpty(emptyToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...)
if err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
if len(emptyToDelete) > 0 {
// Taint all empty nodes synchronously
if err := a.taintNodesSync(emptyToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
}
emptyScaledDown, err := a.deleteAsyncEmpty(emptyToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...)
if err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
}
}
// Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node
// could get recreated on another.
err = a.taintNodesSync(drainToDelete)
if err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
}
if len(drainToDelete) > 0 {
// Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node
// could get recreated on another.
if err := a.taintNodesSync(drainToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
}
// All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously.
drainScaledDown := a.deleteAsyncDrain(drainToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...)
// All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously.
drainScaledDown := a.deleteAsyncDrain(drainToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...)
}
scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
return scaleDownStatus, nil
@ -136,10 +150,12 @@ func (a *Actuator) cropNodesToBudgets(empty, needDrain []*apiv1.Node) ([]*apiv1.
return emptyToDelete, drainToDelete
}
// taintSyncDeleteAsyncEmpty synchronously taints the provided empty nodes, and immediately starts deletions asynchronously.
// deleteAsyncEmpty immediately starts deletions asynchronously.
// scaledDownNodes return value contains all nodes for which deletion successfully started. It's valid and should be consumed
// even if err != nil.
func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNodes []*status.ScaleDownNode, err errors.AutoscalerError) {
func (a *Actuator) deleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNodes []*status.ScaleDownNode, err errors.AutoscalerError) {
var groupIds []string
var validNodes []*apiv1.Node
for _, emptyNode := range empty {
klog.V(0).Infof("Scale-down: removing empty node %q", emptyNode.Name)
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", emptyNode.Name)
@ -150,20 +166,19 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod
continue
}
err = a.taintNode(emptyNode)
if err != nil {
a.ctx.Recorder.Eventf(emptyNode, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return scaledDownNodes, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", emptyNode.Name)
}
if sdNode, err := a.scaleDownNodeToReport(emptyNode, false); err == nil {
scaledDownNodes = append(scaledDownNodes, sdNode)
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name)
go a.scheduleDeletion(emptyNode, nodeGroup.Id(), false)
groupIds = append(groupIds, nodeGroup.Id())
validNodes = append(validNodes, emptyNode)
}
go a.deleteNodesAsync(validNodes, groupIds, false)
return scaledDownNodes, nil
}
@ -189,6 +204,8 @@ func (a *Actuator) taintNodesSync(nodes []*apiv1.Node) errors.AutoscalerError {
// deleteAsyncDrain asynchronously starts deletions with drain for all provided nodes. scaledDownNodes return value contains all nodes for which
// deletion successfully started.
func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*status.ScaleDownNode) {
var groupIds []string
var validNodes []*apiv1.Node
for _, drainNode := range drain {
if sdNode, err := a.scaleDownNodeToReport(drainNode, true); err == nil {
klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods))
@ -197,17 +214,89 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode)
if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err)
continue
}
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name)
go a.scheduleDeletion(drainNode, nodeGroup.Id(), true)
groupIds = append(groupIds, nodeGroup.Id())
validNodes = append(validNodes, drainNode)
}
go a.deleteNodesAsync(validNodes, groupIds, true)
return scaledDownNodes
}
func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, groupIds []string, drain bool) {
var pdbs []*policyv1.PodDisruptionBudget
var registry kube_util.ListerRegistry
if len(nodes) == 0 {
return
}
if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint)
time.Sleep(a.ctx.NodeDeleteDelayAfterTaint)
}
clusterSnapshot, err := a.createSnapshot(nodes)
if err != nil {
klog.Errorf("Scale-down: couldn't create delete snapshot, err: %v", err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "createSnapshot returned error %v", err)}
for i, node := range nodes {
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to create delete snapshot", nodeDeleteResult)
}
return
}
if drain {
pdbs, err = a.ctx.PodDisruptionBudgetLister().List()
if err != nil {
klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)}
for i, node := range nodes {
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to fetch pod disruption budgets", nodeDeleteResult)
}
return
}
registry = a.ctx.ListerRegistry
}
for i, node := range nodes {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(node.Name)
if err != nil {
klog.Errorf("Scale-down: can't retrieve node %q from snapshot, err: %v", node.Name, err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to get node info", nodeDeleteResult)
continue
}
podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, pdbs, time.Now())
if err != nil {
klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)}
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "failed to get pods to move on node", nodeDeleteResult)
continue
}
if !drain && len(podsToRemove) != 0 {
klog.Errorf("Scale-down: couldn't delete empty node %q, new pods got scheduled", node.Name)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "failed to delete empty node %q, new pods scheduled", node.Name)}
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, groupIds[i], drain, a.nodeDeletionTracker, "node is not empty", nodeDeleteResult)
continue
}
go a.scheduleDeletion(nodeInfo, groupIds[i], drain)
}
}
func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.ScaleDownNode, error) {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
@ -223,10 +312,7 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
}
var evictedPods []*apiv1.Pod
if drain {
_, nonDsPodsToEvict, err := podsToEvict(a.ctx, node.Name)
if err != nil {
return nil, err
}
_, nonDsPodsToEvict := podsToEvict(a.ctx, nodeInfo)
evictedPods = nonDsPodsToEvict
}
return &status.ScaleDownNode{
@ -247,13 +333,14 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
return nil
}
func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult {
func (a *Actuator) prepareNodeForDeletion(nodeInfo *framework.NodeInfo, drain bool) status.NodeDeleteResult {
node := nodeInfo.Node()
if drain {
if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil {
if evictionResults, err := a.evictor.DrainNode(a.ctx, nodeInfo); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
}
} else {
if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil {
if err := a.evictor.EvictDaemonSetPods(a.ctx, nodeInfo, time.Now()); err != nil {
// Evicting DS pods is best-effort, so proceed with the deletion even if there are errors.
klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err)
}
@ -265,8 +352,9 @@ func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.N
}
// scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted.
func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain bool) {
nodeDeleteResult := a.prepareNodeForDeletion(node, drain)
func (a *Actuator) scheduleDeletion(nodeInfo *framework.NodeInfo, nodeGroupId string, drain bool) {
node := nodeInfo.Node()
nodeDeleteResult := a.prepareNodeForDeletion(nodeInfo, drain)
if nodeDeleteResult.Err != nil {
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "prepareNodeForDeletion failed", nodeDeleteResult)
return
@ -279,6 +367,36 @@ func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain
}
}
func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (simulator.ClusterSnapshot, error) {
knownNodes := make(map[string]bool)
snapshot := simulator.NewBasicClusterSnapshot()
scheduledPods, err := a.ctx.ScheduledPodLister().List()
if err != nil {
return nil, err
}
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return nil, err
}
knownNodes[node.Name] = true
}
for _, pod := range nonExpendableScheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return nil, err
}
}
}
return snapshot, nil
}
func min(x, y int) int {
if x <= y {
return x

View File

@ -24,8 +24,9 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@ -41,6 +42,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -219,10 +221,17 @@ func TestCropNodesToBudgets(t *testing.T) {
t.Run(tn, func(t *testing.T) {
ctx := &context.AutoscalingContext{
AutoscalingOptions: config.AutoscalingOptions{
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
NodeDeletionBatcherInterval: 0 * time.Second,
NodeDeleteDelayAfterTaint: 1 * time.Second,
},
}
deleteOptions := simulator.NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
}
ndr := deletiontracker.NewNodeDeletionTracker(1 * time.Hour)
for i := 0; i < tc.emptyDeletionsInProgress; i++ {
ndr.StartDeletion("ng1", fmt.Sprintf("empty-node-%d", i))
@ -230,7 +239,8 @@ func TestCropNodesToBudgets(t *testing.T) {
for i := 0; i < tc.drainDeletionsInProgress; i++ {
ndr.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i))
}
actuator := NewActuator(ctx, nil, ndr, 0*time.Second)
actuator := NewActuator(ctx, nil, ndr, deleteOptions)
gotEmpty, gotDrain := actuator.cropNodesToBudgets(tc.emptyNodes, tc.drainNodes)
if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff)
@ -303,8 +313,8 @@ func TestStartDeletion(t *testing.T) {
"deletion with drain": {
drainNodes: generateNodes(2, "drain"),
pods: map[string][]*apiv1.Pod{
"drain-node-0": generatePods(2, "drain-node-0"),
"drain-node-1": generatePods(2, "drain-node-1"),
"drain-node-0": removablePods(2, "drain-node-0"),
"drain-node-1": removablePods(2, "drain-node-1"),
},
wantStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNodeDeleteStarted,
@ -312,13 +322,13 @@ func TestStartDeletion(t *testing.T) {
{
Node: generateNode("drain-node-0"),
NodeGroup: testNg,
EvictedPods: generatePods(2, "drain-node-0"),
EvictedPods: removablePods(2, "drain-node-0"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
{
Node: generateNode("drain-node-1"),
NodeGroup: testNg,
EvictedPods: generatePods(2, "drain-node-1"),
EvictedPods: removablePods(2, "drain-node-1"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
},
@ -342,8 +352,8 @@ func TestStartDeletion(t *testing.T) {
emptyNodes: generateNodes(2, "empty"),
drainNodes: generateNodes(2, "drain"),
pods: map[string][]*apiv1.Pod{
"drain-node-0": generatePods(2, "drain-node-0"),
"drain-node-1": generatePods(2, "drain-node-1"),
"drain-node-0": removablePods(2, "drain-node-0"),
"drain-node-1": removablePods(2, "drain-node-1"),
},
wantStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNodeDeleteStarted,
@ -363,13 +373,13 @@ func TestStartDeletion(t *testing.T) {
{
Node: generateNode("drain-node-0"),
NodeGroup: testNg,
EvictedPods: generatePods(2, "drain-node-0"),
EvictedPods: removablePods(2, "drain-node-0"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
{
Node: generateNode("drain-node-1"),
NodeGroup: testNg,
EvictedPods: generatePods(2, "drain-node-1"),
EvictedPods: removablePods(2, "drain-node-1"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
},
@ -397,53 +407,37 @@ func TestStartDeletion(t *testing.T) {
"drain-node-1": {ResultType: status.NodeDeleteOk},
},
},
"failure to taint empty node stops further deletion": {
"failure to taint empty node stops deletion and cleans already applied taints": {
emptyNodes: generateNodes(4, "empty"),
drainNodes: generateNodes(1, "drain"),
pods: map[string][]*apiv1.Pod{
"drain-node-0": generatePods(2, "drain-node-0"),
"drain-node-0": removablePods(2, "drain-node-0"),
},
failedNodeTaint: map[string]bool{"empty-node-2": true},
wantStatus: &status.ScaleDownStatus{
Result: status.ScaleDownError,
ScaledDownNodes: []*status.ScaleDownNode{
{
Node: generateNode("empty-node-0"),
NodeGroup: testNg,
EvictedPods: nil,
UtilInfo: generateUtilInfo(0, 0),
},
{
Node: generateNode("empty-node-1"),
NodeGroup: testNg,
EvictedPods: nil,
UtilInfo: generateUtilInfo(0, 0),
},
},
Result: status.ScaleDownError,
ScaledDownNodes: nil,
},
wantDeletedNodes: []string{"empty-node-0", "empty-node-1"},
wantTaintUpdates: map[string][][]apiv1.Taint{
"empty-node-0": {
{toBeDeletedTaint},
{},
},
"empty-node-1": {
{toBeDeletedTaint},
{},
},
},
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"empty-node-0": {ResultType: status.NodeDeleteOk},
"empty-node-1": {ResultType: status.NodeDeleteOk},
},
wantErr: cmpopts.AnyError,
},
"failure to taint drain node stops further deletion and cleans already applied taints": {
emptyNodes: generateNodes(2, "empty"),
drainNodes: generateNodes(4, "drain"),
pods: map[string][]*apiv1.Pod{
"drain-node-0": generatePods(2, "drain-node-0"),
"drain-node-1": generatePods(2, "drain-node-1"),
"drain-node-2": generatePods(2, "drain-node-2"),
"drain-node-3": generatePods(2, "drain-node-3"),
"drain-node-0": removablePods(2, "drain-node-0"),
"drain-node-1": removablePods(2, "drain-node-1"),
"drain-node-2": removablePods(2, "drain-node-2"),
"drain-node-3": removablePods(2, "drain-node-3"),
},
failedNodeTaint: map[string]bool{"drain-node-2": true},
wantStatus: &status.ScaleDownStatus{
@ -471,14 +465,6 @@ func TestStartDeletion(t *testing.T) {
"empty-node-1": {
{toBeDeletedTaint},
},
"drain-node-0": {
{toBeDeletedTaint},
{},
},
"drain-node-1": {
{toBeDeletedTaint},
{},
},
},
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"empty-node-0": {ResultType: status.NodeDeleteOk},
@ -489,10 +475,10 @@ func TestStartDeletion(t *testing.T) {
"nodes that failed drain are correctly reported in results": {
drainNodes: generateNodes(4, "drain"),
pods: map[string][]*apiv1.Pod{
"drain-node-0": generatePods(3, "drain-node-0"),
"drain-node-1": generatePods(3, "drain-node-1"),
"drain-node-2": generatePods(3, "drain-node-2"),
"drain-node-3": generatePods(3, "drain-node-3"),
"drain-node-0": removablePods(3, "drain-node-0"),
"drain-node-1": removablePods(3, "drain-node-1"),
"drain-node-2": removablePods(3, "drain-node-2"),
"drain-node-3": removablePods(3, "drain-node-3"),
},
failedPodDrain: map[string]bool{
"drain-node-0-pod-0": true,
@ -505,25 +491,25 @@ func TestStartDeletion(t *testing.T) {
{
Node: generateNode("drain-node-0"),
NodeGroup: testNg,
EvictedPods: generatePods(3, "drain-node-0"),
EvictedPods: removablePods(3, "drain-node-0"),
UtilInfo: generateUtilInfo(3./8., 3./8.),
},
{
Node: generateNode("drain-node-1"),
NodeGroup: testNg,
EvictedPods: generatePods(3, "drain-node-1"),
EvictedPods: removablePods(3, "drain-node-1"),
UtilInfo: generateUtilInfo(3./8., 3./8.),
},
{
Node: generateNode("drain-node-2"),
NodeGroup: testNg,
EvictedPods: generatePods(3, "drain-node-2"),
EvictedPods: removablePods(3, "drain-node-2"),
UtilInfo: generateUtilInfo(3./8., 3./8.),
},
{
Node: generateNode("drain-node-3"),
NodeGroup: testNg,
EvictedPods: generatePods(3, "drain-node-3"),
EvictedPods: removablePods(3, "drain-node-3"),
UtilInfo: generateUtilInfo(3./8., 3./8.),
},
},
@ -556,9 +542,9 @@ func TestStartDeletion(t *testing.T) {
ResultType: status.NodeDeleteErrorFailedToEvictPods,
Err: cmpopts.AnyError,
PodEvictionResults: map[string]status.PodEvictionResult{
"drain-node-0-pod-0": {Pod: generatePod("drain-node-0-pod-0"), Err: cmpopts.AnyError, TimedOut: true},
"drain-node-0-pod-1": {Pod: generatePod("drain-node-0-pod-1"), Err: cmpopts.AnyError, TimedOut: true},
"drain-node-0-pod-2": {Pod: generatePod("drain-node-0-pod-2")},
"drain-node-0-pod-0": {Pod: removablePod("drain-node-0-pod-0", "drain-node-0"), Err: cmpopts.AnyError, TimedOut: true},
"drain-node-0-pod-1": {Pod: removablePod("drain-node-0-pod-1", "drain-node-0"), Err: cmpopts.AnyError, TimedOut: true},
"drain-node-0-pod-2": {Pod: removablePod("drain-node-0-pod-2", "drain-node-0")},
},
},
"drain-node-1": {ResultType: status.NodeDeleteOk},
@ -566,9 +552,9 @@ func TestStartDeletion(t *testing.T) {
ResultType: status.NodeDeleteErrorFailedToEvictPods,
Err: cmpopts.AnyError,
PodEvictionResults: map[string]status.PodEvictionResult{
"drain-node-2-pod-0": {Pod: generatePod("drain-node-2-pod-0")},
"drain-node-2-pod-1": {Pod: generatePod("drain-node-2-pod-1"), Err: cmpopts.AnyError, TimedOut: true},
"drain-node-2-pod-2": {Pod: generatePod("drain-node-2-pod-2")},
"drain-node-2-pod-0": {Pod: removablePod("drain-node-2-pod-0", "drain-node-2")},
"drain-node-2-pod-1": {Pod: removablePod("drain-node-2-pod-1", "drain-node-2"), Err: cmpopts.AnyError, TimedOut: true},
"drain-node-2-pod-2": {Pod: removablePod("drain-node-2-pod-2", "drain-node-2")},
},
},
"drain-node-3": {ResultType: status.NodeDeleteOk},
@ -578,8 +564,8 @@ func TestStartDeletion(t *testing.T) {
emptyNodes: generateNodes(2, "empty"),
drainNodes: generateNodes(2, "drain"),
pods: map[string][]*apiv1.Pod{
"drain-node-0": generatePods(2, "drain-node-0"),
"drain-node-1": generatePods(2, "drain-node-1"),
"drain-node-0": removablePods(2, "drain-node-0"),
"drain-node-1": removablePods(2, "drain-node-1"),
},
failedNodeDeletion: map[string]bool{
"empty-node-1": true,
@ -603,13 +589,13 @@ func TestStartDeletion(t *testing.T) {
{
Node: generateNode("drain-node-0"),
NodeGroup: testNg,
EvictedPods: generatePods(2, "drain-node-0"),
EvictedPods: removablePods(2, "drain-node-0"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
{
Node: generateNode("drain-node-1"),
NodeGroup: testNg,
EvictedPods: generatePods(2, "drain-node-1"),
EvictedPods: removablePods(2, "drain-node-1"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
},
@ -645,8 +631,8 @@ func TestStartDeletion(t *testing.T) {
"DS pods are evicted from empty nodes, but don't block deletion on error": {
emptyNodes: generateNodes(2, "empty"),
pods: map[string][]*apiv1.Pod{
"empty-node-0": {generateDsPod("empty-node-0-ds-pod-0"), generateDsPod("empty-node-0-ds-pod-1")},
"empty-node-1": {generateDsPod("empty-node-1-ds-pod-0"), generateDsPod("empty-node-1-ds-pod-1")},
"empty-node-0": {generateDsPod("empty-node-0-ds-pod-0", "empty-node-0"), generateDsPod("empty-node-0-ds-pod-1", "empty-node-0")},
"empty-node-1": {generateDsPod("empty-node-1-ds-pod-0", "empty-node-1"), generateDsPod("empty-node-1-ds-pod-1", "empty-node-1")},
},
failedPodDrain: map[string]bool{"empty-node-1-ds-pod-0": true},
wantStatus: &status.ScaleDownStatus{
@ -681,11 +667,11 @@ func TestStartDeletion(t *testing.T) {
"empty-node-1": {ResultType: status.NodeDeleteOk},
},
},
"pods are not evicted from nodes with pods if the node is passed as empty": {
"nodes with pods are not deleted if the node is passed as empty": {
emptyNodes: generateNodes(2, "empty-but-with-pods"),
pods: map[string][]*apiv1.Pod{
"empty-but-with-pods-node-0": generatePods(2, "empty-but-with-pods-node--0"),
"empty-but-with-pods-node-1": generatePods(2, "empty-but-with-pods-node--1"),
"empty-but-with-pods-node-0": removablePods(2, "empty-but-with-pods-node-0"),
"empty-but-with-pods-node-1": removablePods(2, "empty-but-with-pods-node-1"),
},
wantStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNodeDeleteStarted,
@ -704,19 +690,21 @@ func TestStartDeletion(t *testing.T) {
},
},
},
wantDeletedNodes: []string{"empty-but-with-pods-node-0", "empty-but-with-pods-node-1"},
wantDeletedNodes: nil,
wantDeletedPods: nil,
wantTaintUpdates: map[string][][]apiv1.Taint{
"empty-but-with-pods-node-0": {
{toBeDeletedTaint},
{},
},
"empty-but-with-pods-node-1": {
{toBeDeletedTaint},
{},
},
},
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"empty-but-with-pods-node-0": {ResultType: status.NodeDeleteOk},
"empty-but-with-pods-node-1": {ResultType: status.NodeDeleteOk},
"empty-but-with-pods-node-0": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError},
"empty-but-with-pods-node-1": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError},
},
},
} {
@ -744,6 +732,8 @@ func TestStartDeletion(t *testing.T) {
deletedNodes := make(chan string, 10)
deletedPods := make(chan string, 10)
ds := generateDaemonSet()
// We're faking the whole k8s client, and some of the code needs to get live nodes and pods, so GET on nodes and pods has to be set up.
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
nodesLock.Lock()
@ -819,7 +809,21 @@ func TestStartDeletion(t *testing.T) {
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
allPods := []*apiv1.Pod{}
for _, pods := range tc.pods {
allPods = append(allPods, pods...)
}
podLister := kube_util.NewTestPodLister(allPods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{ds})
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
@ -1046,6 +1050,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
"test-ng-2": testNg2,
"test-ng-3": testNg3,
}
for ngName, numNodes := range test.numNodesToDelete {
ng := testNg[ngName]
provider.InsertNodeGroup(ng)
@ -1064,7 +1069,10 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
@ -1076,6 +1084,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
nodeDeletionBatcher: NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval),
evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom},
}
for _, nodes := range deleteNodes {
actuator.StartDeletion(nodes, []*apiv1.Node{}, time.Now())
time.Sleep(deleteInterval)
@ -1143,22 +1152,29 @@ func generateNode(name string) *apiv1.Node {
}
}
func generatePods(count int, prefix string) []*apiv1.Pod {
func removablePods(count int, prefix string) []*apiv1.Pod {
var result []*apiv1.Pod
for i := 0; i < count; i++ {
name := fmt.Sprintf("pod-%d", i)
if prefix != "" {
name = prefix + "-" + name
}
result = append(result, generatePod(name))
result = append(result, removablePod(name, prefix))
}
return result
}
func generatePod(name string) *apiv1.Pod {
func removablePod(name string, node string) *apiv1.Pod {
return &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Annotations: map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "true",
},
},
Spec: apiv1.PodSpec{
NodeName: node,
Containers: []apiv1.Container{
{
Name: "test-container",
@ -1174,12 +1190,22 @@ func generatePod(name string) *apiv1.Pod {
}
}
func generateDsPod(name string) *apiv1.Pod {
pod := generatePod(name)
pod.OwnerReferences = GenerateOwnerReferences(name+"-ds", "DaemonSet", "apps/v1", "some-uid")
func generateDsPod(name string, node string) *apiv1.Pod {
pod := removablePod(name, node)
pod.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "some-uid")
return pod
}
func generateDaemonSet() *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "ds",
Namespace: "default",
SelfLink: "/apiv1s/apps/v1/namespaces/default/daemonsets/ds",
},
}
}
func generateUtilInfo(cpuUtil, memUtil float64) utilization.Info {
var higherUtilName apiv1.ResourceName
var higherUtilVal float64

View File

@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
@ -56,27 +57,26 @@ type Evictor struct {
DsEvictionRetryTime time.Duration
DsEvictionEmptyNodeTimeout time.Duration
PodEvictionHeadroom time.Duration
deleteOptions simulator.NodeDeleteOptions
}
// NewDefaultEvictor returns an instance of Evictor using the default parameters.
func NewDefaultEvictor() Evictor {
func NewDefaultEvictor(deleteOptions simulator.NodeDeleteOptions) Evictor {
return Evictor{
EvictionRetryTime: DefaultEvictionRetryTime,
DsEvictionRetryTime: DefaultDsEvictionRetryTime,
DsEvictionEmptyNodeTimeout: DefaultDsEvictionEmptyNodeTimeout,
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
deleteOptions: deleteOptions,
}
}
// DrainNode works like DrainNodeWithPods, but lists of pods to evict don't have to be provided. All non-mirror, non-DS pods on the
// node are evicted. Mirror pods are not evicted. DaemonSet pods are evicted if DaemonSetEvictionForOccupiedNodes is enabled, or
// if they have the EnableDsEvictionKey annotation.
func (e Evictor) DrainNode(ctx *acontext.AutoscalingContext, node *apiv1.Node) (map[string]status.PodEvictionResult, error) {
dsPodsToEvict, nonDsPodsToEvict, err := podsToEvict(ctx, node.Name)
if err != nil {
return nil, err
}
return e.DrainNodeWithPods(ctx, node, nonDsPodsToEvict, dsPodsToEvict)
func (e Evictor) DrainNode(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (map[string]status.PodEvictionResult, error) {
dsPodsToEvict, nonDsPodsToEvict := podsToEvict(ctx, nodeInfo)
return e.DrainNodeWithPods(ctx, nodeInfo.Node(), nonDsPodsToEvict, dsPodsToEvict)
}
// DrainNodeWithPods performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
@ -169,12 +169,9 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
}
// EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node.
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeToDelete *apiv1.Node, timeNow time.Time) error {
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(nodeToDelete.Name)
if err != nil {
return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name)
}
_, daemonSetPods, _, err := simulator.FastGetPodsToMove(nodeInfo, true, true, []*policyv1.PodDisruptionBudget{}, timeNow)
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error {
nodeToDelete := nodeInfo.Node()
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, []*policyv1.PodDisruptionBudget{}, timeNow)
if err != nil {
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}
@ -245,11 +242,7 @@ func evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, isDaemonS
return status.PodEvictionResult{Pod: podToEvict, TimedOut: true, Err: fmt.Errorf("failed to evict pod %s/%s within allowed timeout (last error: %v)", podToEvict.Namespace, podToEvict.Name, lastError)}
}
func podsToEvict(ctx *acontext.AutoscalingContext, nodeName string) (dsPods, nonDsPods []*apiv1.Pod, err error) {
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
return nil, nil, err
}
func podsToEvict(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (dsPods, nonDsPods []*apiv1.Pod) {
for _, podInfo := range nodeInfo.Pods {
if pod_util.IsMirrorPod(podInfo.Pod) {
continue
@ -260,5 +253,5 @@ func podsToEvict(ctx *acontext.AutoscalingContext, nodeName string) (dsPods, non
}
}
dsPodsToEvict := daemonset.PodsToEvict(dsPods, ctx.DaemonSetEvictionForOccupiedNodes)
return dsPodsToEvict, nonDsPods, nil
return dsPodsToEvict, nonDsPods
}

View File

@ -51,7 +51,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
testScenarios := []struct {
name string
dsPods []string
nodeInfoSuccess bool
evictionTimeoutExceed bool
dsEvictionTimeout time.Duration
evictionSuccess bool
@ -63,24 +62,13 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
{
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
},
{
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
evictByDefault: true,
},
{
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
@ -89,7 +77,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
{
name: "Eviction timeout exceed",
dsPods: []string{"d1", "d2", "d3"},
nodeInfoSuccess: true,
evictionTimeoutExceed: true,
dsEvictionTimeout: 100 * time.Millisecond,
evictionSuccess: true,
@ -99,7 +86,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
{
name: "Evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
extraAnnotationValue: map[string]string{"d1": "true"},
@ -108,7 +94,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
{
name: "Don't evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
@ -172,17 +157,15 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
if scenario.nodeInfoSuccess {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods)
} else {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{})
}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods)
evictor := Evictor{
DsEvictionEmptyNodeTimeout: scenario.dsEvictionTimeout,
DsEvictionRetryTime: waitBetweenRetries,
}
err = evictor.EvictDaemonSetPods(&context, n1, timeNow)
nodeInfo, err := context.ClusterSnapshot.NodeInfos().Get(n1.Name)
assert.NoError(t, err)
err = evictor.EvictDaemonSetPods(&context, nodeInfo, timeNow)
if scenario.err != nil {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), scenario.err.Error())
@ -545,7 +528,6 @@ func TestPodsToEvict(t *testing.T) {
dsEvictionDisabled bool
wantDsPods []*apiv1.Pod
wantNonDsPods []*apiv1.Pod
wantErr error
}{
"no pods": {
pods: []*apiv1.Pod{},
@ -588,13 +570,6 @@ func TestPodsToEvict(t *testing.T) {
wantDsPods: []*apiv1.Pod{dsPod("ds-pod-1", false), dsPod("ds-pod-2", false)},
wantNonDsPods: []*apiv1.Pod{regularPod("regular-pod-1"), regularPod("regular-pod-2")},
},
"calling for an unknown node name is an error": {
pods: []*apiv1.Pod{
regularPod("pod-1"), regularPod("pod-2"),
},
nodeNameOverwrite: "unknown-node",
wantErr: cmpopts.AnyError,
},
} {
t.Run(tn, func(t *testing.T) {
snapshot := simulator.NewBasicClusterSnapshot()
@ -613,10 +588,11 @@ func TestPodsToEvict(t *testing.T) {
if tc.nodeNameOverwrite != "" {
nodeName = tc.nodeNameOverwrite
}
gotDsPods, gotNonDsPods, err := podsToEvict(ctx, nodeName)
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("podsToEvict err diff (-want +got):\n%s", diff)
nodeInfo, err := snapshot.NodeInfos().Get(nodeName)
if err != nil {
t.Fatalf("NodeInfos().Get() unexpected error: %v", err)
}
gotDsPods, gotNonDsPods := podsToEvict(ctx, nodeInfo)
if diff := cmp.Diff(tc.wantDsPods, gotDsPods, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("podsToEvict dsPods diff (-want +got):\n%s", diff)
}

View File

@ -61,9 +61,9 @@ type ScaleDown struct {
}
// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDown {
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, false)
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, false)
unremovableNodes := unremovable.NewNodes()
return &ScaleDown{
context: context,

View File

@ -28,6 +28,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -129,9 +130,12 @@ func TestFindUnneededNodes(t *testing.T) {
provider.AddNode("ng1", n8)
provider.AddNode("ng1", n9)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2, p3, p4, p5, p6})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
@ -262,9 +266,12 @@ func TestFindUnneededGPUNodes(t *testing.T) {
provider.AddNode("ng1", n2)
provider.AddNode("ng1", n3)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2, p3})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
@ -385,9 +392,12 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) {
}
for tn, tc := range cases {
t.Run(tn, func(t *testing.T) {
podLister := kube_util.NewTestPodLister(allPods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
@ -461,9 +471,12 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) {
provider.AddNode("ng1", n3)
provider.AddNode("ng1", n4)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
@ -524,9 +537,12 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
numCandidates := 30
podLister := kube_util.NewTestPodLister(pods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
@ -605,9 +621,12 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
numCandidates := 30
podLister := kube_util.NewTestPodLister(pods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
@ -662,9 +681,12 @@ func TestFindUnneededNodePool(t *testing.T) {
numCandidates := 30
podLister := kube_util.NewTestPodLister(pods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, rsLister, nil)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
@ -759,7 +781,10 @@ func TestScaleDown(t *testing.T) {
}
jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
@ -1015,7 +1040,10 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
assert.NotNil(t, provider)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.Options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
@ -1102,7 +1130,11 @@ func TestNoScaleDownUnready(t *testing.T) {
},
MaxGracefulTerminationSec: 60,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p2})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
@ -1267,10 +1299,18 @@ func generateReplicaSets() []*appsv1.ReplicaSet {
func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDownWrapper {
ctx.MaxDrainParallelism = 1
ctx.MaxScaleDownParallelism = 10
ctx.NodeDeletionBatcherInterval = 0 * time.Second
ctx.NodeDeleteDelayAfterTaint = 0 * time.Second
if ndt == nil {
ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, 0*time.Second)
deleteOptions := simulator.NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
}
sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions)
return NewScaleDownWrapper(sd, actuator)
}

View File

@ -154,9 +154,15 @@ func NewStaticAutoscaler(
clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff)
deleteOptions := simulator.NodeDeleteOptions{
SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods,
SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage,
MinReplicaCount: opts.MinReplicaCount,
}
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, opts.NodeDeletionBatcherInterval)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt, deleteOptions)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions)
scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator)
processorCallbacks.scaleDownPlanner = scaleDownWrapper

View File

@ -38,6 +38,7 @@ import (
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -269,10 +270,10 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
// Scale down.
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3)
unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice()
onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour))
@ -456,9 +457,9 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
// Scale down.
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3)
unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onNodeGroupDeleteMock.On("Delete", "autoprovisioned-"+
"TN1").Return(nil).Once()
@ -743,10 +744,10 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
// Scale down.
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3})
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Times(2)
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Times(3)
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p5}, nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice()
onScaleDownMock.On("ScaleDown", "ng1", "n1").Return(nil).Once()
p4.Spec.NodeName = "n2"
@ -1398,9 +1399,16 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) {
ctx.MaxScaleDownParallelism = 10
ctx.MaxDrainParallelism = 1
ctx.NodeDeletionBatcherInterval = 0 * time.Second
ctx.NodeDeleteDelayAfterTaint = 1 * time.Second
deleteOptions := simulator.NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
}
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
sd := legacy.NewScaleDown(ctx, p, cs, ndt)
actuator := actuation.NewActuator(ctx, cs, ndt, 0*time.Second)
sd := legacy.NewScaleDown(ctx, p, cs, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions)
wrapper := legacy.NewScaleDownWrapper(sd, actuator)
return wrapper, wrapper
}

View File

@ -206,6 +206,10 @@ var (
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it")
)
func createAutoscalingOptions() config.AutoscalingOptions {
@ -297,6 +301,10 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxNodesPerScaleUp: *maxNodesPerScaleUp,
MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration,
NodeDeletionBatcherInterval: *nodeDeletionBatcherInterval,
SkipNodesWithSystemPods: *skipNodesWithSystemPods,
SkipNodesWithLocalStorage: *skipNodesWithLocalStorage,
MinReplicaCount: *minReplicaCount,
NodeDeleteDelayAfterTaint: *nodeDeleteDelayAfterTaint,
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package simulator
import (
"flag"
"fmt"
"time"
@ -33,17 +32,6 @@ import (
klog "k8s.io/klog/v2"
)
var (
skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true,
"If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet "+
"or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true,
"If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
minReplicaCount = flag.Int("min-replica-count", 0,
"Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
)
// NodeToBeRemoved contain information about a node that can be removed.
type NodeToBeRemoved struct {
// Node to be removed.
@ -102,16 +90,19 @@ type RemovalSimulator struct {
predicateChecker PredicateChecker
usageTracker *UsageTracker
canPersist bool
deleteOptions NodeDeleteOptions
}
// NewRemovalSimulator returns a new RemovalSimulator.
func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, usageTracker *UsageTracker, persistSuccessfulSimulations bool) *RemovalSimulator {
func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker,
usageTracker *UsageTracker, deleteOptions NodeDeleteOptions, persistSuccessfulSimulations bool) *RemovalSimulator {
return &RemovalSimulator{
listers: listers,
clusterSnapshot: clusterSnapshot,
predicateChecker: predicateChecker,
usageTracker: usageTracker,
canPersist: persistSuccessfulSimulations,
deleteOptions: deleteOptions,
}
}
@ -166,8 +157,7 @@ func (r *RemovalSimulator) CheckNodeRemoval(
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
}
podsToRemove, daemonSetPods, blockingPod, err := DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods,
*skipNodesWithLocalStorage, r.listers, int32(*minReplicaCount), pdbs, timestamp)
podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, pdbs, timestamp)
if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil {
@ -200,8 +190,8 @@ func (r *RemovalSimulator) FindEmptyNodesToRemove(candidates []string, timestamp
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", node, err)
continue
}
// Should block on all pods.
podsToRemove, _, _, err := FastGetPodsToMove(nodeInfo, true, true, nil, timestamp)
// Should block on all pods
podsToRemove, _, _, err := GetPodsToMove(nodeInfo, r.deleteOptions, nil, nil, timestamp)
if err == nil && len(podsToRemove) == 0 {
result = append(result, node)
}

View File

@ -58,7 +58,7 @@ func TestFindPlaceAllOk(t *testing.T) {
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), false).findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), testDeleteOptions(), false).findPlaceFor(
"x",
[]*apiv1.Pod{new1, new2},
destinations,
@ -96,7 +96,7 @@ func TestFindPlaceAllBas(t *testing.T) {
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), false).findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), testDeleteOptions(), false).findPlaceFor(
"nbad",
[]*apiv1.Pod{new1, new2, new3},
destinations,
@ -129,7 +129,7 @@ func TestFindNone(t *testing.T) {
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), false).findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker(), testDeleteOptions(), false).findPlaceFor(
"x",
[]*apiv1.Pod{},
destinations,
@ -162,7 +162,7 @@ func TestFindEmptyNodes(t *testing.T) {
clusterSnapshot := NewBasicClusterSnapshot()
InitializeClusterSnapshotOrDie(t, clusterSnapshot, []*apiv1.Node{nodes[0], nodes[1], nodes[2], nodes[3]}, []*apiv1.Pod{pod1, pod2})
testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC)
r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil, false)
r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil, testDeleteOptions(), false)
emptyNodes := r.FindEmptyNodesToRemove(nodeNames, testTime)
assert.Equal(t, []string{nodeNames[0], nodeNames[2], nodeNames[3]}, emptyNodes)
}
@ -309,7 +309,7 @@ func TestFindNodesToRemove(t *testing.T) {
destinations = append(destinations, node.Name)
}
InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods)
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, false)
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false)
toRemove, unremovable, _, err := r.FindNodesToRemove(
test.candidates, destinations, map[string]string{},
time.Now(), []*policyv1.PodDisruptionBudget{})
@ -320,3 +320,11 @@ func TestFindNodesToRemove(t *testing.T) {
})
}
}
func testDeleteOptions() NodeDeleteOptions {
return NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
}
}

View File

@ -29,44 +29,25 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
// FastGetPodsToMove returns a list of pods that should be moved elsewhere
// and a list of DaemonSet pods that should be evicted if the node
// is drained. Raises error if there is an unreplicated pod.
// Based on kubectl drain code. It makes an assumption that RC, DS, Jobs and RS were deleted
// along with their pods (no abandoned pods with dangling created-by annotation). Useful for fast
// checks.
func FastGetPodsToMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool,
pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
for _, podInfo := range nodeInfo.Pods {
pods = append(pods, podInfo.Pod)
}
pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods,
pdbs,
skipNodesWithSystemPods,
skipNodesWithLocalStorage,
false,
nil,
0,
timestamp)
if err != nil {
return pods, daemonSetPods, blockingPod, err
}
if pdbBlockingPod, err := checkPdbs(pods, pdbs); err != nil {
return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err
}
return pods, daemonSetPods, nil, nil
// NodeDeleteOptions contains various options to customize how draining will behave
type NodeDeleteOptions struct {
// SkipNodesWithSystemPods tells if nodes with pods from kube-system should be deleted (except for DaemonSet or mirror pods)
SkipNodesWithSystemPods bool
// SkipNodesWithLocalStorage tells if nodes with pods with local storage, e.g. EmptyDir or HostPath, should be deleted
SkipNodesWithLocalStorage bool
// MinReplicaCount controls the minimum number of replicas that a replica set or replication controller should have
// to allow their pods deletion in scale down
MinReplicaCount int
}
// DetailedGetPodsForMove returns a list of pods that should be moved elsewhere
// GetPodsToMove returns a list of pods that should be moved elsewhere
// and a list of DaemonSet pods that should be evicted if the node
// is drained. Raises error if there is an unreplicated pod.
// Based on kubectl drain code. It checks whether RC, DS, Jobs and RS that created these pods
// Based on kubectl drain code. If listers is nil it makes an assumption that RC, DS, Jobs and RS were deleted
// along with their pods (no abandoned pods with dangling created-by annotation).
// If listers is not nil it checks whether RC, DS, Jobs and RS that created these pods
// still exist.
func DetailedGetPodsForMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSystemPods bool,
skipNodesWithLocalStorage bool, listers kube_util.ListerRegistry, minReplicaCount int32,
func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry,
pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
for _, podInfo := range nodeInfo.Pods {
pods = append(pods, podInfo.Pod)
@ -74,11 +55,10 @@ func DetailedGetPodsForMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWith
pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods,
pdbs,
skipNodesWithSystemPods,
skipNodesWithLocalStorage,
true,
deleteOptions.SkipNodesWithSystemPods,
deleteOptions.SkipNodesWithLocalStorage,
listers,
minReplicaCount,
int32(deleteOptions.MinReplicaCount),
timestamp)
if err != nil {
return pods, daemonSetPods, blockingPod, err

View File

@ -32,7 +32,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestFastGetPodsToMove(t *testing.T) {
func TestGetPodsToMove(t *testing.T) {
testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC)
// Unreplicated pod
pod1 := &apiv1.Pod{
@ -41,7 +41,12 @@ func TestFastGetPodsToMove(t *testing.T) {
Namespace: "ns",
},
}
_, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod1), true, true, nil, testTime)
deleteOptions := NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
}
_, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod1), deleteOptions, nil, nil, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod1, Reason: drain.NotReplicated}, blockingPod)
@ -53,7 +58,7 @@ func TestFastGetPodsToMove(t *testing.T) {
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
}
r2, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2), true, true, nil, testTime)
r2, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod2), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r2))
@ -69,7 +74,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
r3, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod3), true, true, nil, testTime)
r3, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod3), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 0, len(r3))
@ -82,7 +87,7 @@ func TestFastGetPodsToMove(t *testing.T) {
OwnerReferences: GenerateOwnerReferences("ds", "DaemonSet", "extensions/v1beta1", ""),
},
}
r4, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), true, true, nil, testTime)
r4, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r4))
@ -96,7 +101,7 @@ func TestFastGetPodsToMove(t *testing.T) {
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
}
_, _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod5), true, true, nil, testTime)
_, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod5), deleteOptions, nil, nil, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod5, Reason: drain.UnmovableKubeSystemPod}, blockingPod)
@ -117,7 +122,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
_, _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod6), true, true, nil, testTime)
_, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod6), deleteOptions, nil, nil, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod6, Reason: drain.LocalStorageRequested}, blockingPod)
@ -140,7 +145,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
r7, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod7), true, true, nil, testTime)
r7, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod7), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r7))
@ -176,7 +181,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
}
_, _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod8), true, true, []*policyv1.PodDisruptionBudget{pdb8}, testTime)
_, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod8), deleteOptions, nil, []*policyv1.PodDisruptionBudget{pdb8}, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod8, Reason: drain.NotEnoughPdb}, blockingPod)
@ -210,7 +215,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
}
r9, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod9), true, true, []*policyv1.PodDisruptionBudget{pdb9}, testTime)
r9, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod9), deleteOptions, nil, []*policyv1.PodDisruptionBudget{pdb9}, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r9))
@ -243,7 +248,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
}
r10SkipPodsThatShouldBeTerminatedTrue, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod10, pod10Terminated, pod10Terminating), true, true, nil, testTime)
r10SkipPodsThatShouldBeTerminatedTrue, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod10, pod10Terminated, pod10Terminating), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.ElementsMatch(t, []*apiv1.Pod{pod10, pod10Terminating}, r10SkipPodsThatShouldBeTerminatedTrue)

View File

@ -78,13 +78,13 @@ func GetPodsForDeletionOnNodeDrain(
pdbs []*policyv1.PodDisruptionBudget,
skipNodesWithSystemPods bool,
skipNodesWithLocalStorage bool,
checkReferences bool, // Setting this to true requires client to be not-null.
listers kube_util.ListerRegistry,
minReplica int32,
currentTime time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *BlockingPod, err error) {
pods = []*apiv1.Pod{}
daemonSetPods = []*apiv1.Pod{}
checkReferences := listers != nil
// filter kube-system PDBs to avoid doing it for every kube-system pod
kubeSystemPDBs := make([]*policyv1.PodDisruptionBudget, 0)
for _, pdb := range pdbs {

View File

@ -646,7 +646,7 @@ func TestDrain(t *testing.T) {
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
pods, daemonSetPods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, true, registry, 0, testTime)
pods, daemonSetPods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, registry, 0, testTime)
if test.expectFatal {
assert.Equal(t, test.expectBlockingPod, blockingPod)

View File

@ -22,6 +22,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
v1appslister "k8s.io/client-go/listers/apps/v1"
v1batchlister "k8s.io/client-go/listers/batch/v1"
v1lister "k8s.io/client-go/listers/core/v1"
@ -43,6 +44,21 @@ func NewTestPodLister(pods []*apiv1.Pod) PodLister {
return TestPodLister{pods: pods}
}
// TestPodDisruptionBudgetLister is used in tests involving listers
type TestPodDisruptionBudgetLister struct {
pdbs []*policyv1.PodDisruptionBudget
}
// List returns all pdbs in test lister.
func (lister TestPodDisruptionBudgetLister) List() ([]*policyv1.PodDisruptionBudget, error) {
return lister.pdbs, nil
}
// NewTestPodDisruptionBudgetLister returns a lister that returns provided pod disruption budgets
func NewTestPodDisruptionBudgetLister(pdbs []*policyv1.PodDisruptionBudget) PodDisruptionBudgetLister {
return TestPodDisruptionBudgetLister{pdbs: pdbs}
}
// TestNodeLister is used in tests involving listers
type TestNodeLister struct {
nodes []*apiv1.Node