Introduce NodeDeleterBatcher to ScaleDown actuator

This commit is contained in:
Yaroslava Serdiuk 2022-07-25 14:12:13 +00:00
parent db5e2f2e5d
commit 65b0d78e6e
10 changed files with 677 additions and 137 deletions

View File

@ -192,4 +192,6 @@ type AutoscalingOptions struct {
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
// NodeDeletionBatcherInterval is a time for how long CA ScaleDown gather nodes to delete them in batch.
NodeDeletionBatcherInterval time.Duration
}

View File

@ -24,6 +24,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
@ -34,7 +35,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
)
// Actuator is responsible for draining and deleting nodes.
@ -42,15 +42,18 @@ type Actuator struct {
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionBatcher *NodeDeletionBatcher
evictor Evictor
}
// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, batchInterval time.Duration) *Actuator {
nbd := NewNodeDeletionBatcher(ctx, csr, ndr, batchInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndr,
nodeDeletionBatcher: nbd,
evictor: NewDefaultEvictor(),
}
}
@ -141,7 +144,13 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod
klog.V(0).Infof("Scale-down: removing empty node %q", emptyNode.Name)
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", emptyNode.Name)
err := a.taintNode(emptyNode)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(emptyNode)
if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Errorf("Failed to find node group for %s: %v", emptyNode.Name, err)
continue
}
err = a.taintNode(emptyNode)
if err != nil {
a.ctx.Recorder.Eventf(emptyNode, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return scaledDownNodes, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", emptyNode.Name)
@ -152,17 +161,8 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}
go func(node *apiv1.Node) {
result := a.deleteNode(node, false)
if result.Err == nil {
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name)
} else {
klog.Errorf("Scale-down: couldn't delete empty node, err: %v", err)
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", result.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}(emptyNode)
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name)
go a.scheduleDeletion(emptyNode, nodeGroup.Id(), false)
}
return scaledDownNodes, nil
}
@ -197,17 +197,13 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}
go func(node *apiv1.Node) {
result := a.deleteNode(node, true)
if result.Err == nil {
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name)
} else {
klog.Errorf("Scale-down: couldn't delete node %q with drain, err: %v", node.Name, result.Err)
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", result.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}(drainNode)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode)
if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err)
continue
}
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name)
go a.scheduleDeletion(drainNode, nodeGroup.Id(), true)
}
return scaledDownNodes
}
@ -251,43 +247,38 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
return nil
}
// deleteNode performs the deletion of the provided node. If drain is true, the node is drained before being deleted.
func (a *Actuator) deleteNode(node *apiv1.Node, drain bool) (result status.NodeDeleteResult) {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)}
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)}
}
defer func() { a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, result) }()
func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult {
if drain {
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), node.Name)
if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
}
} else {
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), node.Name)
if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil {
// Evicting DS pods is best-effort, so proceed with the deletion even if there are errors.
klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err)
}
}
if err := WaitForDelayDeletion(node, a.ctx.ListerRegistry.AllNodeLister(), a.ctx.AutoscalingOptions.NodeDeletionDelayTimeout); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
}
if err := DeleteNodeFromCloudProvider(a.ctx, node, a.clusterState); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
}
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(a.ctx.CloudProvider.GPULabel(), a.ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain))
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}
// scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted.
func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain bool) {
nodeDeleteResult := a.prepareNodeForDeletion(node, drain)
if nodeDeleteResult.Err != nil {
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "prepareNodeForDeletion failed", nodeDeleteResult)
return
}
err := a.nodeDeletionBatcher.AddNode(node, drain)
if err != nil {
klog.Errorf("Couldn't add node to nodeDeletionBatcher, err: %v", err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeDeletionBatcher.AddNode for %s returned error: %v", node.Name, err)}
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "failed add node to the nodeDeletionBatche", nodeDeleteResult)
}
}
func min(x, y int) int {
if x <= y {
return x
@ -303,18 +294,34 @@ func joinPodNames(pods []*apiv1.Pod) string {
return strings.Join(names, ",")
}
func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason {
readiness, err := kubernetes.GetNodeReadiness(node)
if err != nil {
klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err)
return metrics.Unready
}
if !readiness.Ready {
return metrics.Unready
}
// Node is ready.
// CleanUpAndRecordFailedScaleDownEvent record failed scale down event and log an error.
func CleanUpAndRecordFailedScaleDownEvent(ctx *context.AutoscalingContext, node *apiv1.Node, nodeGroupId string, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, errMsg string, status status.NodeDeleteResult) {
if drain {
return metrics.Underutilized
klog.Errorf("Scale-down: couldn't delete node %q with drain, %v, status error: %v", node.Name, errMsg, status.Err)
ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", status.Err)
} else {
klog.Errorf("Scale-down: couldn't delete empty node, %v, status error: %v", errMsg, status.Err)
ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", status.Err)
}
return metrics.Empty
deletetaint.CleanToBeDeleted(node, ctx.ClientSet, ctx.CordonNodeBeforeTerminate)
nodeDeletionTracker.EndDeletion(nodeGroupId, node.Name, status)
}
// RegisterAndRecordSuccessfulScaleDownEvent register scale down and record successful scale down event.
func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) {
ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "nodes removed by cluster autoscaler")
csr.RegisterScaleDown(&clusterstate.ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: node.Name,
Time: time.Now(),
ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime),
})
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(ctx.CloudProvider.GPULabel(), ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain))
if drain {
ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name)
} else {
ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name)
}
nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, status.NodeDeleteResult{ResultType: status.NodeDeleteOk})
}

View File

@ -230,7 +230,7 @@ func TestCropNodesToBudgets(t *testing.T) {
for i := 0; i < tc.drainDeletionsInProgress; i++ {
ndr.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i))
}
actuator := NewActuator(ctx, nil, ndr)
actuator := NewActuator(ctx, nil, ndr, 0*time.Second)
gotEmpty, gotDrain := actuator.cropNodesToBudgets(tc.emptyNodes, tc.drainNodes)
if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff)
@ -843,9 +843,11 @@ func TestStartDeletion(t *testing.T) {
}
// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0),
evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom},
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionBatcher: NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second),
evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom},
}
gotStatus, gotErr := actuator.StartDeletion(tc.emptyNodes, tc.drainNodes, time.Now())
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
@ -937,6 +939,166 @@ func TestStartDeletion(t *testing.T) {
}
}
func TestStartDeletionInBatchBasic(t *testing.T) {
testNg1 := testprovider.NewTestNodeGroup("test-ng-1", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
testNg2 := testprovider.NewTestNodeGroup("test-ng-2", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
testNg3 := testprovider.NewTestNodeGroup("test-ng-3", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
deleteInterval := 1 * time.Second
for _, test := range []struct {
name string
deleteCalls int
numNodesToDelete map[*testprovider.TestNodeGroup][]int //per node group and per call
failedRequests map[string]bool //per node group
wantSuccessfulDeletion map[string]int //per node group
}{
{
name: "Succesfull deletion for all node group",
deleteCalls: 1,
numNodesToDelete: map[*testprovider.TestNodeGroup][]int{
testNg1: {4},
testNg2: {5},
testNg3: {1},
},
wantSuccessfulDeletion: map[string]int{
"test-ng-1": 4,
"test-ng-2": 5,
"test-ng-3": 1,
},
},
{
name: "Node deletion failed for one group",
deleteCalls: 1,
numNodesToDelete: map[*testprovider.TestNodeGroup][]int{
testNg1: {4},
testNg2: {5},
testNg3: {1},
},
failedRequests: map[string]bool{
"test-ng-1": true,
},
wantSuccessfulDeletion: map[string]int{
"test-ng-1": 0,
"test-ng-2": 5,
"test-ng-3": 1,
},
},
{
name: "Node deletion failed for one group two times",
deleteCalls: 2,
numNodesToDelete: map[*testprovider.TestNodeGroup][]int{
testNg1: {4, 3},
testNg2: {5},
testNg3: {1},
},
failedRequests: map[string]bool{
"test-ng-1": true,
},
wantSuccessfulDeletion: map[string]int{
"test-ng-1": 0,
"test-ng-2": 5,
"test-ng-3": 1,
},
},
{
name: "Node deletion failed for all groups",
deleteCalls: 2,
numNodesToDelete: map[*testprovider.TestNodeGroup][]int{
testNg1: {4, 3},
testNg2: {5},
testNg3: {1},
},
failedRequests: map[string]bool{
"test-ng-1": true,
"test-ng-2": true,
"test-ng-3": true,
},
wantSuccessfulDeletion: map[string]int{
"test-ng-1": 0,
"test-ng-2": 0,
"test-ng-3": 0,
},
},
} {
t.Run(test.name, func(t *testing.T) {
test := test
gotFailedRequest := func(nodeGroupId string) bool {
val, _ := test.failedRequests[nodeGroupId]
return val
}
deletedResult := make(chan string)
fakeClient := &fake.Clientset{}
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroupId string, node string) error {
if gotFailedRequest(nodeGroupId) {
return fmt.Errorf("SIMULATED ERROR: won't remove node")
}
deletedResult <- nodeGroupId
return nil
})
// 2d array represent the waves of pushing nodes to delete.
deleteNodes := [][]*apiv1.Node{}
for i := 0; i < test.deleteCalls; i++ {
deleteNodes = append(deleteNodes, []*apiv1.Node{})
}
for ng, numNodes := range test.numNodesToDelete {
provider.InsertNodeGroup(ng)
ng.SetCloudProvider(provider)
for i, num := range numNodes {
nodes := generateNodes(num, ng.Id())
deleteNodes[i] = append(deleteNodes[i], nodes...)
for _, node := range nodes {
provider.AddNode(ng.Id(), node)
}
}
}
opts := config.AutoscalingOptions{
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff())
ndt := deletiontracker.NewNodeDeletionTracker(0)
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionBatcher: NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval),
evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom},
}
for _, nodes := range deleteNodes {
actuator.StartDeletion(nodes, []*apiv1.Node{}, time.Now())
time.Sleep(deleteInterval)
}
wantDeletedNodes := 0
for _, num := range test.wantSuccessfulDeletion {
wantDeletedNodes += num
}
gotDeletedNodes := map[string]int{
"test-ng-1": 0,
"test-ng-2": 0,
"test-ng-3": 0,
}
for i := 0; i < wantDeletedNodes; i++ {
select {
case ngId := <-deletedResult:
gotDeletedNodes[ngId]++
case <-time.After(1 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
break
}
}
if diff := cmp.Diff(test.wantSuccessfulDeletion, gotDeletedNodes); diff != "" {
t.Errorf("Successful deleteions per node group diff (-want +got):\n%s", diff)
}
})
}
}
func generateNodes(count int, prefix string) []*apiv1.Node {
var result []*apiv1.Node
for i := 0; i < count; i++ {
@ -949,6 +1111,20 @@ func generateNodes(count int, prefix string) []*apiv1.Node {
return result
}
func generateNodesAndNodeGroupMap(count int, prefix string) map[string]*testprovider.TestNodeGroup {
result := make(map[string]*testprovider.TestNodeGroup)
for i := 0; i < count; i++ {
name := fmt.Sprintf("node-%d", i)
ngName := fmt.Sprintf("test-ng-%v", i)
if prefix != "" {
name = prefix + "-" + name
ngName = prefix + "-" + ngName
}
result[name] = testprovider.NewTestNodeGroup(ngName, 0, 100, 3, true, false, "n1-standard-2", nil, nil)
}
return result
}
func generateNode(name string) *apiv1.Node {
return &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{Name: name},

View File

@ -1,66 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actuation
import (
"reflect"
"time"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
const (
// MaxKubernetesEmptyNodeDeletionTime is the maximum time needed by Kubernetes to delete an empty node.
MaxKubernetesEmptyNodeDeletionTime = 3 * time.Minute
// MaxCloudProviderNodeDeletionTime is the maximum time needed by cloud provider to delete a node.
MaxCloudProviderNodeDeletionTime = 5 * time.Minute
)
// DeleteNodeFromCloudProvider removes the given node from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side. If successful, the deletion is recorded in CSR, and an event is emitted on the node.
func DeleteNodeFromCloudProvider(ctx *context.AutoscalingContext, node *apiv1.Node, registry *clusterstate.ClusterStateRegistry) errors.AutoscalerError {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)
}
if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil {
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err)
}
ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler")
registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: node.Name,
Time: time.Now(),
ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime),
})
return nil
}
// IsNodeBeingDeleted returns true iff a given node is being deleted.
func IsNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool {
deleteTime, _ := deletetaint.GetToBeDeletedTime(node)
return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime)
}

View File

@ -0,0 +1,182 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actuation
import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/klog/v2"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
const (
// MaxKubernetesEmptyNodeDeletionTime is the maximum time needed by Kubernetes to delete an empty node.
MaxKubernetesEmptyNodeDeletionTime = 3 * time.Minute
// MaxCloudProviderNodeDeletionTime is the maximum time needed by cloud provider to delete a node.
MaxCloudProviderNodeDeletionTime = 5 * time.Minute
)
// NodeDeletionBatcher batch scale down candidates for one node group and remove them.
type NodeDeletionBatcher struct {
sync.Mutex
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
deletionsPerNodeGroup map[string][]*apiv1.Node
deleteInterval time.Duration
drainedNodeDeletions map[string]bool
}
// NewNodeDeletionBatcher return new NodeBatchDeleter
func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) *NodeDeletionBatcher {
return &NodeDeletionBatcher{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: nodeDeletionTracker,
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
deleteInterval: deleteInterval,
drainedNodeDeletions: make(map[string]bool),
}
}
// AddNode adds node to delete candidates and schedule deletion.
func (d *NodeDeletionBatcher) AddNode(node *apiv1.Node, drain bool) error {
// If delete interval is 0, than instantly start node deletion.
if d.deleteInterval == 0 {
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, []*apiv1.Node{node})
if err != nil {
result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result)
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
}
return nil
}
nodeGroupId, first, err := d.addNodeToBucket(node, drain)
if err != nil {
return err
}
if first {
go func(nodeGroupId string) {
time.Sleep(d.deleteInterval)
d.remove(nodeGroupId)
}(nodeGroupId)
}
return nil
}
// AddToBucket adds node to delete candidates and return if it's a first node in the group.
func (d *NodeDeletionBatcher) addNodeToBucket(node *apiv1.Node, drain bool) (string, bool, error) {
d.Lock()
defer d.Unlock()
nodeGroup, err := d.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return "", false, err
}
d.drainedNodeDeletions[node.Name] = drain
val, ok := d.deletionsPerNodeGroup[nodeGroup.Id()]
if !ok || len(val) == 0 {
d.deletionsPerNodeGroup[nodeGroup.Id()] = []*apiv1.Node{node}
return nodeGroup.Id(), true, nil
}
d.deletionsPerNodeGroup[nodeGroup.Id()] = append(d.deletionsPerNodeGroup[nodeGroup.Id()], node)
return nodeGroup.Id(), false, nil
}
// remove delete nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node.
func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {
d.Lock()
defer d.Unlock()
nodes, ok := d.deletionsPerNodeGroup[nodeGroupId]
if !ok {
return fmt.Errorf("Node Group %s is not present in the batch deleter", nodeGroupId)
}
delete(d.deletionsPerNodeGroup, nodeGroupId)
drainedNodeDeletions := make(map[string]bool)
for _, node := range nodes {
drainedNodeDeletions[node.Name] = d.drainedNodeDeletions[node.Name]
delete(d.drainedNodeDeletions, node.Name)
}
go func(nodes []*apiv1.Node, drainedNodeDeletions map[string]bool) {
var result status.NodeDeleteResult
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes)
for _, node := range nodes {
drain := drainedNodeDeletions[node.Name]
if err != nil {
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result)
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
}
}
}(nodes, drainedNodeDeletions)
return nil
}
// deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0])
if err != nil {
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return nodeGroup, errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", nodes[0].Name)
}
if err = nodeGroup.DeleteNodes(nodes); err != nil {
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", nodes[0].Name, err)
}
return nodeGroup, nil
}
func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason {
readiness, err := kubernetes.GetNodeReadiness(node)
if err != nil {
klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err)
return metrics.Unready
}
if !readiness.Ready {
return metrics.Unready
}
// Node is ready.
if drain {
return metrics.Underutilized
}
return metrics.Empty
}
// IsNodeBeingDeleted returns true iff a given node is being deleted.
func IsNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool {
deleteTime, _ := deletetaint.GetToBeDeletedTime(node)
return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime)
}

View File

@ -0,0 +1,236 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actuation
import (
"fmt"
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
kube_record "k8s.io/client-go/tools/record"
)
func TestAddNodeToBucket(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
nodeGroup1 := "ng-1"
nodeGroup2 := "ng-2"
nodes1 := generateNodes(5, "ng-1")
nodes2 := generateNodes(5, "ng-2")
provider.AddNodeGroup(nodeGroup1, 1, 10, 5)
provider.AddNodeGroup(nodeGroup2, 1, 10, 5)
for _, node := range nodes1 {
provider.AddNode(nodeGroup1, node)
}
for _, node := range nodes2 {
provider.AddNode(nodeGroup2, node)
}
testcases := []struct {
name string
nodes []*apiv1.Node
wantBatches int
drained bool
}{
{
name: "Add 1 node",
nodes: []*apiv1.Node{nodes1[0]},
wantBatches: 1,
},
{
name: "Add nodes that belong to one nodeGroup",
nodes: nodes1,
wantBatches: 1,
},
{
name: "Add 3 nodes that belong to 2 nodeGroups",
nodes: []*apiv1.Node{nodes1[0], nodes2[0], nodes2[1]},
wantBatches: 2,
},
{
name: "Add 3 nodes that belong to 2 nodeGroups, all nodes are drained",
nodes: []*apiv1.Node{nodes1[0], nodes2[0], nodes2[1]},
wantBatches: 2,
drained: true,
},
}
for _, test := range testcases {
d := NodeDeletionBatcher{
ctx: &ctx,
clusterState: nil,
nodeDeletionTracker: nil,
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
drainedNodeDeletions: make(map[string]bool),
}
batchCount := 0
for _, node := range test.nodes {
_, first, err := d.addNodeToBucket(node, test.drained)
if err != nil {
t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node)
}
if first {
batchCount += 1
}
}
if batchCount != test.wantBatches {
t.Errorf("Want %d batches, got %d batches", test.wantBatches, batchCount)
}
}
}
func TestRemove(t *testing.T) {
testCases := []struct {
name string
err bool
numNodes int
failedDeletion int
addNgToBucket bool
}{
{
name: "Remove NodeGroup that is not present in bucket",
err: true,
addNgToBucket: false,
},
{
name: "Regular successful remove",
err: false,
numNodes: 5,
failedDeletion: 0,
addNgToBucket: true,
},
{
name: "Unsuccessful remove",
numNodes: 5,
failedDeletion: 1,
addNgToBucket: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
test := test
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
failedNodeDeletion := make(map[string]bool)
deletedNodes := make(chan string, 10)
notDeletedNodes := make(chan string, 10)
// Hook node deletion at the level of cloud provider, to gather which nodes were deleted, and to fail the deletion for
// certain nodes to simulate errors.
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
if failedNodeDeletion[node] {
notDeletedNodes <- node
return fmt.Errorf("SIMULATED ERROR: won't remove node")
}
deletedNodes <- node
return nil
})
fakeClient.Fake.AddReactor("update", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
return true, obj, nil
})
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil, nil)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder, NewBackoff())
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
ng := "ng"
provider.AddNodeGroup(ng, 1, 10, test.numNodes)
d := NodeDeletionBatcher{
ctx: &ctx,
clusterState: clusterStateRegistry,
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(1 * time.Minute),
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
drainedNodeDeletions: make(map[string]bool),
}
nodes := generateNodes(test.numNodes, ng)
failedDeletion := test.failedDeletion
for _, node := range nodes {
if failedDeletion > 0 {
failedNodeDeletion[node.Name] = true
failedDeletion -= 1
}
provider.AddNode(ng, node)
}
if test.addNgToBucket {
for _, node := range nodes {
node.Spec.Taints = append(node.Spec.Taints, apiv1.Taint{
Key: deletetaint.ToBeDeletedTaint,
Effect: apiv1.TaintEffectNoSchedule,
})
_, _, err := d.addNodeToBucket(node, true)
if err != nil {
t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node)
}
}
}
err = d.remove(ng)
if test.err {
if err == nil {
t.Errorf("remove() should return error, but return nil")
}
return
}
if err != nil {
t.Errorf("remove() return error, but shouldn't")
}
if test.failedDeletion == 0 {
gotDeletedNodes := []string{}
for i := 0; i < test.numNodes; i++ {
select {
case deletedNode := <-deletedNodes:
gotDeletedNodes = append(gotDeletedNodes, deletedNode)
case <-time.After(4 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
}
}
} else {
select {
case <-notDeletedNodes:
case <-time.After(4 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
}
}
if len(d.deletionsPerNodeGroup) > 0 {
t.Errorf("Number of bathces hasn't reach 0 after remove(), got: %v", len(d.deletionsPerNodeGroup))
}
if len(d.drainedNodeDeletions) > 0 {
t.Errorf(" Drained node map is not empty, got: %v", len(d.drainedNodeDeletions))
}
})
}
}

View File

@ -1404,6 +1404,6 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry
ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, 0*time.Second)
return NewScaleDownWrapper(sd, actuator)
}

View File

@ -156,7 +156,7 @@ func NewStaticAutoscaler(
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, opts.NodeDeletionBatcherInterval)
scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator)
processorCallbacks.scaleDownPlanner = scaleDownWrapper

View File

@ -680,6 +680,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
ExpendablePodsPriorityCutoff: 10,
NodeDeletionBatcherInterval: 0 * time.Second,
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
@ -1399,7 +1400,7 @@ func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContex
ctx.MaxDrainParallelism = 1
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
sd := legacy.NewScaleDown(ctx, p, cs, ndt)
actuator := actuation.NewActuator(ctx, cs, ndt)
actuator := actuation.NewActuator(ctx, cs, ndt, 0*time.Second)
wrapper := legacy.NewScaleDownWrapper(sd, actuator)
return wrapper, wrapper
}

View File

@ -124,13 +124,14 @@ var (
"for scale down when some candidates from previous iteration are no longer valid."+
"When calculating the pool size for additional candidates we take"+
"max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).")
nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format <gpu_type>:<min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.")
cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider,
nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.")
nodeDeletionBatcherInterval = flag.Duration("node-deletion-batcher-interval", 0*time.Second, "How long CA ScaleDown gather nodes to delete them in batch.")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format <gpu_type>:<min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.")
cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider,
"Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]")
maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.")
maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.")
@ -293,6 +294,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
RecordDuplicatedEvents: *recordDuplicatedEvents,
MaxNodesPerScaleUp: *maxNodesPerScaleUp,
MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration,
NodeDeletionBatcherInterval: *nodeDeletionBatcherInterval,
}
}