From 1cea91895df91d21329433a7f39c026df32b32bc Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Sat, 5 Nov 2016 19:10:52 +0100 Subject: [PATCH] Cluster-autoscaler: delete empty nodes in bulk --- cluster-autoscaler/cluster_autoscaler.go | 10 ++-- cluster-autoscaler/scale_down.go | 62 +++++++++++++++----- cluster-autoscaler/simulator/cluster.go | 24 ++++++++ cluster-autoscaler/simulator/cluster_test.go | 19 ++++++ 4 files changed, 96 insertions(+), 19 deletions(-) diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index 39c0c559ab..a522fce24a 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -79,9 +79,10 @@ var ( "Node utilization level, defined as sum of requested resources divided by capacity, below which a node can be considered for scale down") scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute, "How often scale down possiblity is check") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") - maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") - cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") + scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") + cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") + maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") // AvailableEstimators is a list of available estimators. AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName} @@ -319,7 +320,8 @@ func run(_ <-chan struct{}) { predicateChecker, podLocationHints, usageTracker, - recorder) + recorder, + *maxEmptyBulkDeleteFlag) updateDuration("scaledown", scaleDownStart) diff --git a/cluster-autoscaler/scale_down.go b/cluster-autoscaler/scale_down.go index b3bbd9f76b..51a0f7f6a6 100644 --- a/cluster-autoscaler/scale_down.go +++ b/cluster-autoscaler/scale_down.go @@ -120,7 +120,8 @@ func ScaleDown( predicateChecker *simulator.PredicateChecker, oldHints map[string]string, usageTracker *simulator.UsageTracker, - recorder kube_record.EventRecorder) (ScaleDownResult, error) { + recorder kube_record.EventRecorder, + maxEmptyBulkDelete int) (ScaleDownResult, error) { now := time.Now() candidates := make([]*kube_api.Node, 0) @@ -163,6 +164,35 @@ func ScaleDown( return ScaleDownNoUnneeded, nil } + emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods) + if len(emptyNodes) > 0 { + limit := maxEmptyBulkDelete + if len(emptyNodes) < limit { + limit = len(emptyNodes) + } + emptyNodes = emptyNodes[:limit] + confirmation := make(chan error, len(emptyNodes)) + for _, node := range emptyNodes { + glog.V(0).Infof("Scale-down: removing empty node %s", node.Name) + simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes) + go func(nodeToDelete *kube_api.Node) { + confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder) + }(node) + } + var finalError error + for range emptyNodes { + if err := <-confirmation; err != nil { + glog.Errorf("Problem with empty node deletion: %v", err) + finalError = err + } + } + if finalError == nil { + return ScaleDownNodeDeleted, nil + } else { + return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError) + } + } + // We look for only 1 node so new hints may be incomplete. nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false, oldHints, usageTracker, time.Now()) @@ -183,23 +213,25 @@ func ScaleDown( glog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: ", toRemove.Node.Name, utilization, strings.Join(podNames, ",")) - nodeGroup, err := cloudProvider.NodeGroupForNode(toRemove.Node) - if err != nil { - return ScaleDownError, fmt.Errorf("failed to node group for %s: %v", toRemove.Node.Name, err) - } - if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { - return ScaleDownError, fmt.Errorf("picked node that doesn't belong to a node group: %s", toRemove.Node.Name) - } - - err = nodeGroup.DeleteNodes([]*kube_api.Node{toRemove.Node}) simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes) - + err = deleteNodeFromCloudProvider(toRemove.Node, cloudProvider, recorder) if err != nil { return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err) } - - recorder.Eventf(toRemove.Node, kube_api.EventTypeNormal, "ScaleDown", - "node removed by cluster autoscaler") - return ScaleDownNodeDeleted, nil } + +func deleteNodeFromCloudProvider(node *kube_api.Node, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error { + nodeGroup, err := cloudProvider.NodeGroupForNode(node) + if err != nil { + return fmt.Errorf("failed to node group for %s: %v", node.Name, err) + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name) + } + if err = nodeGroup.DeleteNodes([]*kube_api.Node{node}); err != nil { + return fmt.Errorf("failed to delete %s: %v", node.Name, err) + } + recorder.Eventf(node, kube_api.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler") + return nil +} diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 50c1629f99..6c331870d2 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -111,6 +111,30 @@ candidateloop: return result, newHints, nil } +// FindEmptyNodesToRemove finds empty nodes that can be removed. +func FindEmptyNodesToRemove(candidates []*kube_api.Node, pods []*kube_api.Pod) []*kube_api.Node { + nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods) + for _, node := range candidates { + if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found { + nodeInfo.SetNode(node) + } + } + result := make([]*kube_api.Node, 0) + for _, node := range candidates { + if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found { + // Should block on all pods. + podsToRemove, err := FastGetPodsToMove(nodeInfo, true, true) + if err == nil && len(podsToRemove) == 0 { + result = append(result, node) + } + } else { + // Node without pods. + result = append(result, node) + } + } + return result +} + // CalculateUtilization calculates utilization of a node, defined as total amount of requested resources divided by capacity. func CalculateUtilization(node *kube_api.Node, nodeInfo *schedulercache.NodeInfo) (float64, error) { cpu, err := calculateUtilizationOfResource(node, nodeInfo, kube_api.ResourceCPU) diff --git a/cluster-autoscaler/simulator/cluster_test.go b/cluster-autoscaler/simulator/cluster_test.go index 94e610e6a7..7cc4791780 100644 --- a/cluster-autoscaler/simulator/cluster_test.go +++ b/cluster-autoscaler/simulator/cluster_test.go @@ -23,6 +23,7 @@ import ( . "k8s.io/contrib/cluster-autoscaler/utils/test" kube_api "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "github.com/stretchr/testify/assert" @@ -150,3 +151,21 @@ func TestShuffleNodes(t *testing.T) { } assert.True(t, gotPermutation) } + +func TestFindEmptyNodes(t *testing.T) { + pod1 := BuildTestPod("p1", 300, 500000) + pod1.Spec.NodeName = "n1" + pod2 := BuildTestPod("p2", 300, 500000) + pod2.Spec.NodeName = "n2" + pod2.Annotations = map[string]string{ + types.ConfigMirrorAnnotationKey: "", + } + + node1 := BuildTestNode("n1", 1000, 2000000) + node2 := BuildTestNode("n2", 1000, 2000000) + node3 := BuildTestNode("n3", 1000, 2000000) + node4 := BuildTestNode("n4", 1000, 2000000) + + emptyNodes := FindEmptyNodesToRemove([]*kube_api.Node{node1, node2, node3, node4}, []*kube_api.Pod{pod1, pod2}) + assert.Equal(t, []*kube_api.Node{node2, node3, node4}, emptyNodes) +}