diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 5d809514ad..d2fce2ca42 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -103,6 +103,8 @@ type AutoscalingOptions struct { // The formula to calculate additional candidates number is following: // max(#nodes * ScaleDownCandidatesPoolRatio, ScaleDownCandidatesPoolMinCount) ScaleDownCandidatesPoolMinCount int + // NodeDeletionDelayTimeout is maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node. + NodeDeletionDelayTimeout time.Duration // WriteStatusConfigMap tells if the status information should be written to a ConfigMap WriteStatusConfigMap bool // BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them. diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index dd5b6dc92b..fa47e0e185 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" @@ -42,6 +43,7 @@ import ( kube_record "k8s.io/client-go/tools/record" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/klog" @@ -50,6 +52,9 @@ import ( const ( // ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down. ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled" + // DelayDeletionAnnotationPrefix is the prefix of annotation marking node as it needs to wait + // for other K8s components before deleting node. + DelayDeletionAnnotationPrefix = "delay-deletion.cluster-autoscaler.kubernetes.io/" ) const ( @@ -66,42 +71,78 @@ const ( PodEvictionHeadroom = 30 * time.Second ) -// NodeDeleteStatus tells whether a node is being deleted right now. -type NodeDeleteStatus struct { +// NodeDeletionTracker keeps track of node deletions. +type NodeDeletionTracker struct { sync.Mutex - deleteInProgress bool + nonEmptyNodeDeleteInProgress bool // A map of node delete results by node name. It's being constantly emptied into ScaleDownStatus // objects in order to notify the ScaleDownStatusProcessor that the node drain has ended or that // an error occurred during the deletion process. nodeDeleteResults map[string]status.NodeDeleteResult + // A map which keeps track of deletions in progress for nodepools. + // Key is a node group id and value is a number of node deletions in progress. + deletionsInProgress map[string]int } // Get current time. Proxy for unit tests. var now func() time.Time = time.Now -// IsDeleteInProgress returns true if a node is being deleted. -func (n *NodeDeleteStatus) IsDeleteInProgress() bool { - n.Lock() - defer n.Unlock() - return n.deleteInProgress +// NewNodeDeletionTracker creates new NodeDeletionTracker. +func NewNodeDeletionTracker() *NodeDeletionTracker { + return &NodeDeletionTracker{ + nodeDeleteResults: make(map[string]status.NodeDeleteResult), + deletionsInProgress: make(map[string]int), + } } -// SetDeleteInProgress sets deletion process status -func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) { +// IsNonEmptyNodeDeleteInProgress returns true if a non empty node is being deleted. +func (n *NodeDeletionTracker) IsNonEmptyNodeDeleteInProgress() bool { n.Lock() defer n.Unlock() - n.deleteInProgress = status + return n.nonEmptyNodeDeleteInProgress +} + +// SetNonEmptyNodeDeleteInProgress sets non empty node deletion in progress status. +func (n *NodeDeletionTracker) SetNonEmptyNodeDeleteInProgress(status bool) { + n.Lock() + defer n.Unlock() + n.nonEmptyNodeDeleteInProgress = status +} + +// StartDeletion increments node deletion in progress counter for the given nodegroup. +func (n *NodeDeletionTracker) StartDeletion(nodeGroupId string) { + n.Lock() + defer n.Unlock() + n.deletionsInProgress[nodeGroupId]++ +} + +// EndDeletion decrements node deletion in progress counter for the given nodegroup. +func (n *NodeDeletionTracker) EndDeletion(nodeGroupId string) { + n.Lock() + defer n.Unlock() + n.deletionsInProgress[nodeGroupId]-- + if n.deletionsInProgress[nodeGroupId] < 0 { + delete(n.deletionsInProgress, nodeGroupId) + klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus is below 0", nodeGroupId) + } +} + +// GetDeletionsInProgress returns the number of deletions in progress for the given node group. +func (n *NodeDeletionTracker) GetDeletionsInProgress(nodeGroupId string) int { + n.Lock() + defer n.Unlock() + return n.deletionsInProgress[nodeGroupId] } // AddNodeDeleteResult adds a node delete result to the result map. -func (n *NodeDeleteStatus) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) { +func (n *NodeDeletionTracker) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) { n.Lock() defer n.Unlock() n.nodeDeleteResults[nodeName] = result } // GetAndClearNodeDeleteResults returns the whole result map and replaces it with a new empty one. -func (n *NodeDeleteStatus) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult { +func (n *NodeDeletionTracker) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult { n.Lock() defer n.Unlock() results := n.nodeDeleteResults @@ -311,7 +352,7 @@ type ScaleDown struct { podLocationHints map[string]string nodeUtilizationMap map[string]simulator.UtilizationInfo usageTracker *simulator.UsageTracker - nodeDeleteStatus *NodeDeleteStatus + nodeDeletionTracker *NodeDeletionTracker } // NewScaleDown builds new ScaleDown object. @@ -325,7 +366,7 @@ func NewScaleDown(context *context.AutoscalingContext, clusterStateRegistry *clu nodeUtilizationMap: make(map[string]simulator.UtilizationInfo), usageTracker: simulator.NewUsageTracker(), unneededNodesList: make([]*apiv1.Node, 0), - nodeDeleteStatus: &NodeDeleteStatus{nodeDeleteResults: make(map[string]status.NodeDeleteResult)}, + nodeDeletionTracker: NewNodeDeletionTracker(), } } @@ -419,7 +460,7 @@ func (sd *ScaleDown) UpdateUnneededNodes( emptyNodes := make(map[string]bool) - emptyNodesList := getEmptyNodesNoResourceLimits(currentlyUnneededNodes, pods, len(currentlyUnneededNodes), sd.context.CloudProvider) + emptyNodesList := sd.getEmptyNodesNoResourceLimits(currentlyUnneededNodes, pods, len(currentlyUnneededNodes)) for _, node := range emptyNodesList { emptyNodes[node.Name] = true } @@ -636,7 +677,7 @@ func (sd *ScaleDown) SoftTaintUnneededNodes(allNodes []*apiv1.Node) (errors []er // TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was // removed and error if such occurred. func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) { - scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeleteStatus.GetAndClearNodeDeleteResults()} + scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()} nodeDeletionDuration := time.Duration(0) findNodesToRemoveDuration := time.Duration(0) defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration) @@ -697,7 +738,7 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p continue } - if size <= nodeGroup.MinSize() { + if size-sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id()) <= nodeGroup.MinSize() { klog.V(1).Infof("Skipping %s - node group min size reached", node.Name) continue } @@ -727,26 +768,23 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will // try to delete not-so-empty nodes, possibly killing some pods and allowing them // to recreate on other nodes. - emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft, sd.context.CloudProvider) + emptyNodes := sd.getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft) if len(emptyNodes) > 0 { nodeDeletionStart := time.Now() - confirmation := make(chan nodeDeletionConfirmation, len(emptyNodes)) - sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.CloudProvider, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups, confirmation) - deletedNodes, err := sd.waitForEmptyNodesDeleted(emptyNodes, confirmation) + deletedNodes, err := sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups) nodeDeletionDuration = time.Now().Sub(nodeDeletionStart) // TODO: Give the processor some information about the nodes that failed to be deleted. scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes(deletedNodes, candidateNodeGroups, make(map[string][]*apiv1.Pod)) if len(deletedNodes) > 0 { - scaleDownStatus.Result = status.ScaleDownNodeDeleted + scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted } else { scaleDownStatus.Result = status.ScaleDownError } - - if err == nil { - return scaleDownStatus, nil + if err != nil { + return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ") } - return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ") + return scaleDownStatus, nil } findNodesToRemoveStart := time.Now() @@ -784,19 +822,24 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p // Starting deletion. nodeDeletionDuration = time.Now().Sub(nodeDeletionStart) - sd.nodeDeleteStatus.SetDeleteInProgress(true) + sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true) go func() { // Finishing the delete process once this goroutine is over. var result status.NodeDeleteResult - defer func() { sd.nodeDeleteStatus.AddNodeDeleteResult(toRemove.Node.Name, result) }() - defer sd.nodeDeleteStatus.SetDeleteInProgress(false) - result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule) + defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(toRemove.Node.Name, result) }() + defer sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(false) + nodeGroup, found := candidateNodeGroups[toRemove.Node.Name] + if !found { + result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError( + errors.CloudProviderError, "failed to find node group for %s", toRemove.Node.Name)} + return + } + result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, nodeGroup) if result.ResultType != status.NodeDeleteOk { klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err) return } - nodeGroup := candidateNodeGroups[toRemove.Node.Name] if readinessMap[toRemove.Node.Name] { metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Underutilized) } else { @@ -819,24 +862,22 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration metrics.UpdateDuration(metrics.ScaleDownMiscOperations, miscDuration) } -func getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, - cloudProvider cloudprovider.CloudProvider) []*apiv1.Node { - return getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources(), cloudProvider) +func (sd *ScaleDown) getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int) []*apiv1.Node { + return sd.getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources()) } // This functions finds empty nodes among passed candidates and returns a list of empty nodes // that can be deleted at the same time. -func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, - resourcesLimits scaleDownResourcesLimits, cloudProvider cloudprovider.CloudProvider) []*apiv1.Node { +func (sd *ScaleDown) getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, + resourcesLimits scaleDownResourcesLimits) []*apiv1.Node { emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods) availabilityMap := make(map[string]int) result := make([]*apiv1.Node, 0) resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter resourcesNames := sets.StringKeySet(resourcesLimits).List() - for _, node := range emptyNodes { - nodeGroup, err := cloudProvider.NodeGroupForNode(node) + nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node) if err != nil { klog.Errorf("Failed to get group for %s", node.Name) continue @@ -853,14 +894,14 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err) continue } - available = size - nodeGroup.MinSize() + available = size - nodeGroup.MinSize() - sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id()) if available < 0 { available = 0 } availabilityMap[nodeGroup.Id()] = available } if available > 0 { - resourcesDelta, err := computeScaleDownResourcesDelta(cloudProvider, node, nodeGroup, resourcesNames) + resourcesDelta, err := computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesNames) if err != nil { klog.Errorf("Error: %v", err) continue @@ -881,25 +922,30 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele return result[:limit] } -type nodeDeletionConfirmation struct { - node *apiv1.Node - err errors.AutoscalerError -} - -func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, cp cloudprovider.CloudProvider, client kube_client.Interface, +func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, readinessMap map[string]bool, - candidateNodeGroups map[string]cloudprovider.NodeGroup, confirmation chan nodeDeletionConfirmation) { + candidateNodeGroups map[string]cloudprovider.NodeGroup) ([]*apiv1.Node, errors.AutoscalerError) { + deletedNodes := []*apiv1.Node{} for _, node := range emptyNodes { klog.V(0).Infof("Scale-down: removing empty node %s", node.Name) sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name) simulator.RemoveNodeFromTracker(sd.usageTracker, node.Name, sd.unneededNodes) - go func(nodeToDelete *apiv1.Node) { - taintErr := deletetaint.MarkToBeDeleted(nodeToDelete, client) - if taintErr != nil { - recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr) - confirmation <- nodeDeletionConfirmation{node: nodeToDelete, err: errors.ToAutoscalerError(errors.ApiCallError, taintErr)} - return - } + nodeGroup, found := candidateNodeGroups[node.Name] + if !found { + return deletedNodes, errors.NewAutoscalerError( + errors.CloudProviderError, "failed to find node group for %s", node.Name) + } + taintErr := deletetaint.MarkToBeDeleted(node, client) + if taintErr != nil { + recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr) + return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr) + } + deletedNodes = append(deletedNodes, node) + go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup) { + sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id()) + defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id()) + var result status.NodeDeleteResult + defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(nodeToDelete.Name, result) }() var deleteErr errors.AutoscalerError // If we fail to delete the node we want to remove delete taint @@ -912,47 +958,32 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, cp cloud } }() + deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout) + if deleteErr != nil { + klog.Errorf("Problem with empty node deletion: %v", deleteErr) + result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: deleteErr} + return + } deleteErr = deleteNodeFromCloudProvider(nodeToDelete, sd.context.CloudProvider, sd.context.Recorder, sd.clusterStateRegistry) - if deleteErr == nil { - nodeGroup := candidateNodeGroups[nodeToDelete.Name] - if readinessMap[nodeToDelete.Name] { - metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(cp.GPULabel(), cp.GetAvailableGPUTypes(), nodeToDelete, nodeGroup), metrics.Empty) - } else { - metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(cp.GPULabel(), cp.GetAvailableGPUTypes(), nodeToDelete, nodeGroup), metrics.Unready) - } + if deleteErr != nil { + klog.Errorf("Problem with empty node deletion: %v", deleteErr) + result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: deleteErr} + return } - confirmation <- nodeDeletionConfirmation{node: nodeToDelete, err: deleteErr} - }(node) - } -} - -func (sd *ScaleDown) waitForEmptyNodesDeleted(emptyNodes []*apiv1.Node, confirmation chan nodeDeletionConfirmation) (deletedNodes []*apiv1.Node, finalError errors.AutoscalerError) { - deletedNodes = make([]*apiv1.Node, 0) - - startTime := time.Now() - for range emptyNodes { - timeElapsed := time.Now().Sub(startTime) - timeLeft := MaxCloudProviderNodeDeletionTime - timeElapsed - if timeLeft < 0 { - return deletedNodes, errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time") - } - select { - case conf := <-confirmation: - if conf.err != nil { - klog.Errorf("Problem with empty node deletion: %v", conf.err) - finalError = conf.err + if readinessMap[nodeToDelete.Name] { + metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Empty) } else { - deletedNodes = append(deletedNodes, conf.node) + metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Unready) } - case <-time.After(timeLeft): - finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time") - } + result = status.NodeDeleteResult{ResultType: status.NodeDeleteOk} + }(node, nodeGroup) } - return deletedNodes, finalError + return deletedNodes, nil } -func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod) status.NodeDeleteResult { +func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, + nodeGroup cloudprovider.NodeGroup) status.NodeDeleteResult { deleteSuccessful := false drainSuccessful := false @@ -961,6 +992,9 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod) status.Node return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)} } + sd.nodeDeletionTracker.StartDeletion(nodeGroup.Id()) + defer sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id()) + // If we fail to evict all the pods from the node we want to remove delete taint defer func() { if !deleteSuccessful { @@ -982,9 +1016,14 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod) status.Node } drainSuccessful = true + if typedErr := waitForDelayDeletion(node, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout); typedErr != nil { + return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: typedErr} + } + // attempt delete from cloud provider - if err := deleteNodeFromCloudProvider(node, sd.context.CloudProvider, sd.context.Recorder, sd.clusterStateRegistry); err != nil { - return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} + + if typedErr := deleteNodeFromCloudProvider(node, sd.context.CloudProvider, sd.context.Recorder, sd.clusterStateRegistry); typedErr != nil { + return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: typedErr} } deleteSuccessful = true // Let the deferred function know there is no need to cleanup @@ -1129,6 +1168,38 @@ func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.C return nil } +func waitForDelayDeletion(node *apiv1.Node, nodeLister kubernetes.NodeLister, timeout time.Duration) errors.AutoscalerError { + if hasDelayDeletionAnnotation(node) { + klog.V(1).Infof("Wait for removing %s annotations on node %v", DelayDeletionAnnotationPrefix, node.Name) + err := wait.Poll(5*time.Second, timeout, func() (bool, error) { + klog.V(2).Infof("Waiting for removing %s annotations on node %v", DelayDeletionAnnotationPrefix, node.Name) + freshNode, err := nodeLister.Get(node.Name) + if err != nil || freshNode == nil { + return false, fmt.Errorf("failed to get node %v: %v", node.Name, err) + } + return !hasDelayDeletionAnnotation(freshNode), nil + }) + if err != nil && err != wait.ErrWaitTimeout { + return errors.ToAutoscalerError(errors.ApiCallError, err) + } + if err == wait.ErrWaitTimeout { + klog.Warningf("Delay node deletion timed out for node %v, delay deletion annotation wasn't removed within %v, this might slow down scale down.", node.Name, timeout) + } else { + klog.V(2).Infof("Annotation %s removed from node %v", DelayDeletionAnnotationPrefix, node.Name) + } + } + return nil +} + +func hasDelayDeletionAnnotation(node *apiv1.Node) bool { + for annotation := range node.Annotations { + if strings.HasPrefix(annotation, DelayDeletionAnnotationPrefix) { + return true + } + } + return false +} + func hasNoScaleDownAnnotation(node *apiv1.Node) bool { return node.Annotations[ScaleDownDisabledKey] == "true" } diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index d3b3e9bc94..e7455426b1 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1beta1" @@ -599,13 +600,14 @@ func TestDeleteNode(t *testing.T) { fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc) // build context - context := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + context := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) sd := NewScaleDown(&context, clusterStateRegistry) // attempt delete - result := sd.deleteNode(n1, pods) + result := sd.deleteNode(n1, pods, provider.GetNodeGroup("ng1")) // verify if scenario.expectedDeletion { @@ -950,7 +952,7 @@ func TestScaleDown(t *testing.T) { func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) { for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) { - if !sd.nodeDeleteStatus.IsDeleteInProgress() { + if !sd.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() { return } } @@ -1088,6 +1090,24 @@ func TestScaleDownEmptyMinGroupSizeLimitHit(t *testing.T) { simpleScaleDownEmpty(t, config) } +func TestScaleDownEmptyMinGroupSizeLimitHitWhenOneNodeIsBeingDeleted(t *testing.T) { + nodeDeletionTracker := NewNodeDeletionTracker() + nodeDeletionTracker.StartDeletion("ng1") + nodeDeletionTracker.StartDeletion("ng1") + options := defaultScaleDownOptions + config := &scaleTestConfig{ + nodes: []nodeConfig{ + {"n1", 2000, 1000, 0, true, "ng1"}, + {"n2", 2000, 1000, 0, true, "ng1"}, + {"n3", 2000, 1000, 0, true, "ng1"}, + }, + options: options, + expectedScaleDowns: []string{}, + nodeDeletionTracker: nodeDeletionTracker, + } + simpleScaleDownEmpty(t, config) +} + func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { updatedNodes := make(chan string, 10) deletedNodes := make(chan string, 10) @@ -1147,22 +1167,23 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { assert.NotNil(t, provider) - context := NewScaleTestAutoscalingContext(config.options, fakeClient, nil, provider, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + context := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) scaleDown := NewScaleDown(&context, clusterStateRegistry) + if config.nodeDeletionTracker != nil { + scaleDown.nodeDeletionTracker = config.nodeDeletionTracker + } scaleDown.UpdateUnneededNodes(nodes, nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now()) - waitForDeleteToFinish(t, scaleDown) - // This helps to verify that TryToScaleDown doesn't attempt to remove anything - // after delete in progress status is gone. - close(deletedNodes) + assert.False(t, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()) assert.NoError(t, err) var expectedScaleDownResult status.ScaleDownResult if len(config.expectedScaleDowns) > 0 { - expectedScaleDownResult = status.ScaleDownNodeDeleted + expectedScaleDownResult = status.ScaleDownNodeDeleteStarted } else { expectedScaleDownResult = status.ScaleDownNoUnneeded } @@ -1172,8 +1193,8 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { // Report only up to 10 extra nodes found. deleted := make([]string, 0, len(config.expectedScaleDowns)+10) for i := 0; i < len(config.expectedScaleDowns)+10; i++ { - d := getStringFromChanImmediately(deletedNodes) - if d == "" { // a closed channel yields empty value + d := getStringFromChan(deletedNodes) + if d == nothingReturned { // a closed channel yields empty value break } deleted = append(deleted, d) @@ -1222,7 +1243,8 @@ func TestNoScaleDownUnready(t *testing.T) { ScaleDownUnreadyTime: time.Hour, MaxGracefulTerminationSec: 60, } - context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) // N1 is unready so it requires a bigger unneeded time. clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -1730,3 +1752,55 @@ func TestSoftTaintTimeLimit(t *testing.T) { assert.Empty(t, errs) assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient)) } + +func TestWaitForDelayDeletion(t *testing.T) { + type testcase struct { + name string + addAnnotation bool + removeAnnotation bool + expectCallingGetNode bool + } + tests := []testcase{ + { + name: "annotation not set", + addAnnotation: false, + removeAnnotation: false, + }, + { + name: "annotation set and removed", + addAnnotation: true, + removeAnnotation: true, + }, + { + name: "annotation set but not removed", + addAnnotation: true, + removeAnnotation: false, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + node := BuildTestNode("n1", 1000, 10) + nodeWithAnnotation := BuildTestNode("n1", 1000, 10) + nodeWithAnnotation.Annotations = map[string]string{DelayDeletionAnnotationPrefix + "ingress": "true"} + allNodeListerMock := &nodeListerMock{} + if test.addAnnotation { + if test.removeAnnotation { + allNodeListerMock.On("Get").Return(node, nil).Once() + } else { + allNodeListerMock.On("Get").Return(nodeWithAnnotation, nil).Twice() + } + } + var err error + if test.addAnnotation { + err = waitForDelayDeletion(nodeWithAnnotation, allNodeListerMock, 6*time.Second) + } else { + err = waitForDelayDeletion(node, allNodeListerMock, 6*time.Second) + } + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allNodeListerMock) + }) + } +} diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index 5faddbbe70..0848a92894 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -75,6 +75,7 @@ type scaleTestConfig struct { expectedFinalScaleUp groupSizeChange // we expect this to be delivered via scale-up event expectedScaleDowns []string options config.AutoscalingOptions + nodeDeletionTracker *NodeDeletionTracker } // NewScaleTestAutoscalingContext creates a new test autoscaling context for scaling tests. diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index ba383d8739..0272137a23 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -398,16 +398,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) // In dry run only utilization is updated - calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeleteStatus.IsDeleteInProgress() + calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+ "lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v isDeleteInProgress=%v", calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, - a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeleteStatus.IsDeleteInProgress()) + a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()) if scaleDownInCooldown { scaleDownStatus.Result = status.ScaleDownInCooldown - } else if scaleDown.nodeDeleteStatus.IsDeleteInProgress() { + } else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() { scaleDownStatus.Result = status.ScaleDownInProgress } else { klog.V(4).Infof("Starting scale down") diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 167dd2b162..6d3ba58563 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -53,6 +53,10 @@ func (l *nodeListerMock) List() ([]*apiv1.Node, error) { args := l.Called() return args.Get(0).([]*apiv1.Node), args.Error(1) } +func (l *nodeListerMock) Get(name string) (*apiv1.Node, error) { + args := l.Called() + return args.Get(0).(*apiv1.Node), args.Error(1) +} type podListerMock struct { mock.Mock diff --git a/cluster-autoscaler/expander/price/preferred_test.go b/cluster-autoscaler/expander/price/preferred_test.go index deeae7cc7c..59aa1241fa 100644 --- a/cluster-autoscaler/expander/price/preferred_test.go +++ b/cluster-autoscaler/expander/price/preferred_test.go @@ -33,6 +33,10 @@ func (n *testNodeLister) List() ([]*apiv1.Node, error) { return n.list, nil } +func (n *testNodeLister) Get(name string) (*apiv1.Node, error) { + return nil, nil +} + func testPreferredNodeSingleCase(t *testing.T, currentNodes int, expectedNodeSize int) { nodes := []*apiv1.Node{} for i := 1; i <= currentNodes; i++ { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 35419178e2..8849a664c0 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -114,12 +114,13 @@ var ( "for scale down when some candidates from previous iteration are no longer valid."+ "When calculating the pool size for additional candidates we take"+ "max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") - maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") - coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") - memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") - gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") - cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, + nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") + scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") + coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") + memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") + gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") + cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, "Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]") maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.") maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.") @@ -185,7 +186,6 @@ func createAutoscalingOptions() config.AutoscalingOptions { if err != nil { klog.Fatalf("Failed to parse flags: %v", err) } - return config.AutoscalingOptions{ CloudConfig: *cloudConfig, CloudProviderName: *cloudProviderFlag, @@ -231,6 +231,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { NewPodScaleUpDelay: *newPodScaleUpDelay, FilterOutSchedulablePodsUsesPacking: *filterOutSchedulablePodsUsesPacking, IgnoredTaints: *ignoreTaintsFlag, + NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout, } } diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index f4874c1418..f5b0d8d7d0 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -223,6 +223,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc // NodeLister lists nodes. type NodeLister interface { List() ([]*apiv1.Node, error) + Get(name string) (*apiv1.Node, error) } // ReadyNodeLister lists ready nodes. @@ -245,6 +246,15 @@ func (readyNodeLister *ReadyNodeLister) List() ([]*apiv1.Node, error) { return readyNodes, nil } +// Get returns the node with the given name. +func (readyNodeLister *ReadyNodeLister) Get(name string) (*apiv1.Node, error) { + node, err := readyNodeLister.nodeLister.Get(name) + if err != nil { + return nil, err + } + return node, nil +} + // NewReadyNodeLister builds a node lister. func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister { listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) @@ -272,6 +282,15 @@ func (allNodeLister *AllNodeLister) List() ([]*apiv1.Node, error) { return allNodes, nil } +// Get returns the node with the given name. +func (allNodeLister *AllNodeLister) Get(name string) (*apiv1.Node, error) { + node, err := allNodeLister.nodeLister.Get(name) + if err != nil { + return nil, err + } + return node, nil +} + // NewAllNodeLister builds a node lister that returns all nodes (ready and unready) func NewAllNodeLister(kubeClient client.Interface, stopchannel <-chan struct{}) NodeLister { listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())