Cluster-autoscaler: delete empty nodes in bulk

This commit is contained in:
Marcin Wielgus 2016-11-05 19:10:52 +01:00
parent f76a966bf7
commit 1cea91895d
4 changed files with 96 additions and 19 deletions

View File

@ -79,9 +79,10 @@ var (
"Node utilization level, defined as sum of requested resources divided by capacity, below which a node can be considered for scale down")
scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute,
"How often scale down possiblity is check")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
// AvailableEstimators is a list of available estimators.
AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName}
@ -319,7 +320,8 @@ func run(_ <-chan struct{}) {
predicateChecker,
podLocationHints,
usageTracker,
recorder)
recorder,
*maxEmptyBulkDeleteFlag)
updateDuration("scaledown", scaleDownStart)

View File

@ -120,7 +120,8 @@ func ScaleDown(
predicateChecker *simulator.PredicateChecker,
oldHints map[string]string,
usageTracker *simulator.UsageTracker,
recorder kube_record.EventRecorder) (ScaleDownResult, error) {
recorder kube_record.EventRecorder,
maxEmptyBulkDelete int) (ScaleDownResult, error) {
now := time.Now()
candidates := make([]*kube_api.Node, 0)
@ -163,6 +164,35 @@ func ScaleDown(
return ScaleDownNoUnneeded, nil
}
emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods)
if len(emptyNodes) > 0 {
limit := maxEmptyBulkDelete
if len(emptyNodes) < limit {
limit = len(emptyNodes)
}
emptyNodes = emptyNodes[:limit]
confirmation := make(chan error, len(emptyNodes))
for _, node := range emptyNodes {
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes)
go func(nodeToDelete *kube_api.Node) {
confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder)
}(node)
}
var finalError error
for range emptyNodes {
if err := <-confirmation; err != nil {
glog.Errorf("Problem with empty node deletion: %v", err)
finalError = err
}
}
if finalError == nil {
return ScaleDownNodeDeleted, nil
} else {
return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError)
}
}
// We look for only 1 node so new hints may be incomplete.
nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false,
oldHints, usageTracker, time.Now())
@ -183,23 +213,25 @@ func ScaleDown(
glog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: ", toRemove.Node.Name, utilization,
strings.Join(podNames, ","))
nodeGroup, err := cloudProvider.NodeGroupForNode(toRemove.Node)
if err != nil {
return ScaleDownError, fmt.Errorf("failed to node group for %s: %v", toRemove.Node.Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return ScaleDownError, fmt.Errorf("picked node that doesn't belong to a node group: %s", toRemove.Node.Name)
}
err = nodeGroup.DeleteNodes([]*kube_api.Node{toRemove.Node})
simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes)
err = deleteNodeFromCloudProvider(toRemove.Node, cloudProvider, recorder)
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
}
recorder.Eventf(toRemove.Node, kube_api.EventTypeNormal, "ScaleDown",
"node removed by cluster autoscaler")
return ScaleDownNodeDeleted, nil
}
func deleteNodeFromCloudProvider(node *kube_api.Node, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return fmt.Errorf("failed to node group for %s: %v", node.Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name)
}
if err = nodeGroup.DeleteNodes([]*kube_api.Node{node}); err != nil {
return fmt.Errorf("failed to delete %s: %v", node.Name, err)
}
recorder.Eventf(node, kube_api.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler")
return nil
}

View File

@ -111,6 +111,30 @@ candidateloop:
return result, newHints, nil
}
// FindEmptyNodesToRemove finds empty nodes that can be removed.
func FindEmptyNodesToRemove(candidates []*kube_api.Node, pods []*kube_api.Pod) []*kube_api.Node {
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods)
for _, node := range candidates {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
nodeInfo.SetNode(node)
}
}
result := make([]*kube_api.Node, 0)
for _, node := range candidates {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
// Should block on all pods.
podsToRemove, err := FastGetPodsToMove(nodeInfo, true, true)
if err == nil && len(podsToRemove) == 0 {
result = append(result, node)
}
} else {
// Node without pods.
result = append(result, node)
}
}
return result
}
// CalculateUtilization calculates utilization of a node, defined as total amount of requested resources divided by capacity.
func CalculateUtilization(node *kube_api.Node, nodeInfo *schedulercache.NodeInfo) (float64, error) {
cpu, err := calculateUtilizationOfResource(node, nodeInfo, kube_api.ResourceCPU)

View File

@ -23,6 +23,7 @@ import (
. "k8s.io/contrib/cluster-autoscaler/utils/test"
kube_api "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/stretchr/testify/assert"
@ -150,3 +151,21 @@ func TestShuffleNodes(t *testing.T) {
}
assert.True(t, gotPermutation)
}
func TestFindEmptyNodes(t *testing.T) {
pod1 := BuildTestPod("p1", 300, 500000)
pod1.Spec.NodeName = "n1"
pod2 := BuildTestPod("p2", 300, 500000)
pod2.Spec.NodeName = "n2"
pod2.Annotations = map[string]string{
types.ConfigMirrorAnnotationKey: "",
}
node1 := BuildTestNode("n1", 1000, 2000000)
node2 := BuildTestNode("n2", 1000, 2000000)
node3 := BuildTestNode("n3", 1000, 2000000)
node4 := BuildTestNode("n4", 1000, 2000000)
emptyNodes := FindEmptyNodesToRemove([]*kube_api.Node{node1, node2, node3, node4}, []*kube_api.Pod{pod1, pod2})
assert.Equal(t, []*kube_api.Node{node2, node3, node4}, emptyNodes)
}