From 1cea91895df91d21329433a7f39c026df32b32bc Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Sat, 5 Nov 2016 19:10:52 +0100 Subject: [PATCH 1/2] Cluster-autoscaler: delete empty nodes in bulk --- cluster-autoscaler/cluster_autoscaler.go | 10 ++-- cluster-autoscaler/scale_down.go | 62 +++++++++++++++----- cluster-autoscaler/simulator/cluster.go | 24 ++++++++ cluster-autoscaler/simulator/cluster_test.go | 19 ++++++ 4 files changed, 96 insertions(+), 19 deletions(-) diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index 39c0c559ab..a522fce24a 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -79,9 +79,10 @@ var ( "Node utilization level, defined as sum of requested resources divided by capacity, below which a node can be considered for scale down") scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute, "How often scale down possiblity is check") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") - maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") - cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") + scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") + cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") + maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") // AvailableEstimators is a list of available estimators. AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName} @@ -319,7 +320,8 @@ func run(_ <-chan struct{}) { predicateChecker, podLocationHints, usageTracker, - recorder) + recorder, + *maxEmptyBulkDeleteFlag) updateDuration("scaledown", scaleDownStart) diff --git a/cluster-autoscaler/scale_down.go b/cluster-autoscaler/scale_down.go index b3bbd9f76b..51a0f7f6a6 100644 --- a/cluster-autoscaler/scale_down.go +++ b/cluster-autoscaler/scale_down.go @@ -120,7 +120,8 @@ func ScaleDown( predicateChecker *simulator.PredicateChecker, oldHints map[string]string, usageTracker *simulator.UsageTracker, - recorder kube_record.EventRecorder) (ScaleDownResult, error) { + recorder kube_record.EventRecorder, + maxEmptyBulkDelete int) (ScaleDownResult, error) { now := time.Now() candidates := make([]*kube_api.Node, 0) @@ -163,6 +164,35 @@ func ScaleDown( return ScaleDownNoUnneeded, nil } + emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods) + if len(emptyNodes) > 0 { + limit := maxEmptyBulkDelete + if len(emptyNodes) < limit { + limit = len(emptyNodes) + } + emptyNodes = emptyNodes[:limit] + confirmation := make(chan error, len(emptyNodes)) + for _, node := range emptyNodes { + glog.V(0).Infof("Scale-down: removing empty node %s", node.Name) + simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes) + go func(nodeToDelete *kube_api.Node) { + confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder) + }(node) + } + var finalError error + for range emptyNodes { + if err := <-confirmation; err != nil { + glog.Errorf("Problem with empty node deletion: %v", err) + finalError = err + } + } + if finalError == nil { + return ScaleDownNodeDeleted, nil + } else { + return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError) + } + } + // We look for only 1 node so new hints may be incomplete. nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false, oldHints, usageTracker, time.Now()) @@ -183,23 +213,25 @@ func ScaleDown( glog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: ", toRemove.Node.Name, utilization, strings.Join(podNames, ",")) - nodeGroup, err := cloudProvider.NodeGroupForNode(toRemove.Node) - if err != nil { - return ScaleDownError, fmt.Errorf("failed to node group for %s: %v", toRemove.Node.Name, err) - } - if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { - return ScaleDownError, fmt.Errorf("picked node that doesn't belong to a node group: %s", toRemove.Node.Name) - } - - err = nodeGroup.DeleteNodes([]*kube_api.Node{toRemove.Node}) simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes) - + err = deleteNodeFromCloudProvider(toRemove.Node, cloudProvider, recorder) if err != nil { return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err) } - - recorder.Eventf(toRemove.Node, kube_api.EventTypeNormal, "ScaleDown", - "node removed by cluster autoscaler") - return ScaleDownNodeDeleted, nil } + +func deleteNodeFromCloudProvider(node *kube_api.Node, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error { + nodeGroup, err := cloudProvider.NodeGroupForNode(node) + if err != nil { + return fmt.Errorf("failed to node group for %s: %v", node.Name, err) + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name) + } + if err = nodeGroup.DeleteNodes([]*kube_api.Node{node}); err != nil { + return fmt.Errorf("failed to delete %s: %v", node.Name, err) + } + recorder.Eventf(node, kube_api.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler") + return nil +} diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 50c1629f99..6c331870d2 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -111,6 +111,30 @@ candidateloop: return result, newHints, nil } +// FindEmptyNodesToRemove finds empty nodes that can be removed. +func FindEmptyNodesToRemove(candidates []*kube_api.Node, pods []*kube_api.Pod) []*kube_api.Node { + nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods) + for _, node := range candidates { + if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found { + nodeInfo.SetNode(node) + } + } + result := make([]*kube_api.Node, 0) + for _, node := range candidates { + if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found { + // Should block on all pods. + podsToRemove, err := FastGetPodsToMove(nodeInfo, true, true) + if err == nil && len(podsToRemove) == 0 { + result = append(result, node) + } + } else { + // Node without pods. + result = append(result, node) + } + } + return result +} + // CalculateUtilization calculates utilization of a node, defined as total amount of requested resources divided by capacity. func CalculateUtilization(node *kube_api.Node, nodeInfo *schedulercache.NodeInfo) (float64, error) { cpu, err := calculateUtilizationOfResource(node, nodeInfo, kube_api.ResourceCPU) diff --git a/cluster-autoscaler/simulator/cluster_test.go b/cluster-autoscaler/simulator/cluster_test.go index 94e610e6a7..7cc4791780 100644 --- a/cluster-autoscaler/simulator/cluster_test.go +++ b/cluster-autoscaler/simulator/cluster_test.go @@ -23,6 +23,7 @@ import ( . "k8s.io/contrib/cluster-autoscaler/utils/test" kube_api "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "github.com/stretchr/testify/assert" @@ -150,3 +151,21 @@ func TestShuffleNodes(t *testing.T) { } assert.True(t, gotPermutation) } + +func TestFindEmptyNodes(t *testing.T) { + pod1 := BuildTestPod("p1", 300, 500000) + pod1.Spec.NodeName = "n1" + pod2 := BuildTestPod("p2", 300, 500000) + pod2.Spec.NodeName = "n2" + pod2.Annotations = map[string]string{ + types.ConfigMirrorAnnotationKey: "", + } + + node1 := BuildTestNode("n1", 1000, 2000000) + node2 := BuildTestNode("n2", 1000, 2000000) + node3 := BuildTestNode("n3", 1000, 2000000) + node4 := BuildTestNode("n4", 1000, 2000000) + + emptyNodes := FindEmptyNodesToRemove([]*kube_api.Node{node1, node2, node3, node4}, []*kube_api.Pod{pod1, pod2}) + assert.Equal(t, []*kube_api.Node{node2, node3, node4}, emptyNodes) +} From 1a0f80dd2fc2861cedcf06fc122dd541df5e8a03 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Sun, 6 Nov 2016 00:33:28 +0100 Subject: [PATCH 2/2] Cluster-autoscaler: empty node accounting --- cluster-autoscaler/scale_down.go | 55 +++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/cluster-autoscaler/scale_down.go b/cluster-autoscaler/scale_down.go index 51a0f7f6a6..582e500768 100644 --- a/cluster-autoscaler/scale_down.go +++ b/cluster-autoscaler/scale_down.go @@ -164,13 +164,11 @@ func ScaleDown( return ScaleDownNoUnneeded, nil } - emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods) + // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will + // try to delete not-so-empty nodes, possibly killing some pods and allowing them + // to recreate on other nodes. + emptyNodes := getEmptyNodes(candidates, pods, maxEmptyBulkDelete, cloudProvider) if len(emptyNodes) > 0 { - limit := maxEmptyBulkDelete - if len(emptyNodes) < limit { - limit = len(emptyNodes) - } - emptyNodes = emptyNodes[:limit] confirmation := make(chan error, len(emptyNodes)) for _, node := range emptyNodes { glog.V(0).Infof("Scale-down: removing empty node %s", node.Name) @@ -188,9 +186,8 @@ func ScaleDown( } if finalError == nil { return ScaleDownNodeDeleted, nil - } else { - return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError) } + return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError) } // We look for only 1 node so new hints may be incomplete. @@ -221,6 +218,48 @@ func ScaleDown( return ScaleDownNodeDeleted, nil } +// This functions finds empty nodes among passed candidates and returns a list of empty nodes +// that can be deleted at the same time. +func getEmptyNodes(candidates []*kube_api.Node, pods []*kube_api.Pod, maxEmptyBulkDelete int, cloudProvider cloudprovider.CloudProvider) []*kube_api.Node { + emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods) + availabilityMap := make(map[string]int) + result := make([]*kube_api.Node, 0) + for _, node := range emptyNodes { + nodeGroup, err := cloudProvider.NodeGroupForNode(node) + if err != nil { + glog.Errorf("Failed to get group for %s", node.Name) + continue + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + continue + } + var available int + var found bool + if _, found = availabilityMap[nodeGroup.Id()]; !found { + size, err := nodeGroup.TargetSize() + if err != nil { + glog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err) + continue + } + available = size - nodeGroup.MinSize() + if available < 0 { + available = 0 + } + availabilityMap[nodeGroup.Id()] = available + } + if available > 0 { + available -= 1 + availabilityMap[nodeGroup.Id()] = available + result = append(result, node) + } + } + limit := maxEmptyBulkDelete + if len(result) < limit { + limit = len(result) + } + return result[:limit] +} + func deleteNodeFromCloudProvider(node *kube_api.Node, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error { nodeGroup, err := cloudProvider.NodeGroupForNode(node) if err != nil {