Merge pull request https://github.com/kubernetes/contrib/pull/1992 from mwielgus/bulkdelete
Automatic merge from submit-queue Cluster-autoscaler: delete empty nodes in bulk Regular nodes are deleted one by one. However, if a nodes are completely empty (only daemonsets or manifest-run pods) then they can easily be deleted in bulk. With this PR in scale down we will first try to delete empty nodes in bulk and if that fails, we will proceed with regular node deletion. cc: @fgrzadkowski @piosz @jszczepkowski
This commit is contained in:
commit
d37a94c85d
|
|
@ -79,9 +79,10 @@ var (
|
|||
"Node utilization level, defined as sum of requested resources divided by capacity, below which a node can be considered for scale down")
|
||||
scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute,
|
||||
"How often scale down possiblity is check")
|
||||
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
|
||||
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
|
||||
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
|
||||
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
|
||||
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
|
||||
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
|
||||
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
|
||||
|
||||
// AvailableEstimators is a list of available estimators.
|
||||
AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName}
|
||||
|
|
@ -319,7 +320,8 @@ func run(_ <-chan struct{}) {
|
|||
predicateChecker,
|
||||
podLocationHints,
|
||||
usageTracker,
|
||||
recorder)
|
||||
recorder,
|
||||
*maxEmptyBulkDeleteFlag)
|
||||
|
||||
updateDuration("scaledown", scaleDownStart)
|
||||
|
||||
|
|
|
|||
|
|
@ -120,7 +120,8 @@ func ScaleDown(
|
|||
predicateChecker *simulator.PredicateChecker,
|
||||
oldHints map[string]string,
|
||||
usageTracker *simulator.UsageTracker,
|
||||
recorder kube_record.EventRecorder) (ScaleDownResult, error) {
|
||||
recorder kube_record.EventRecorder,
|
||||
maxEmptyBulkDelete int) (ScaleDownResult, error) {
|
||||
|
||||
now := time.Now()
|
||||
candidates := make([]*kube_api.Node, 0)
|
||||
|
|
@ -163,6 +164,32 @@ func ScaleDown(
|
|||
return ScaleDownNoUnneeded, nil
|
||||
}
|
||||
|
||||
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
|
||||
// try to delete not-so-empty nodes, possibly killing some pods and allowing them
|
||||
// to recreate on other nodes.
|
||||
emptyNodes := getEmptyNodes(candidates, pods, maxEmptyBulkDelete, cloudProvider)
|
||||
if len(emptyNodes) > 0 {
|
||||
confirmation := make(chan error, len(emptyNodes))
|
||||
for _, node := range emptyNodes {
|
||||
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
|
||||
simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes)
|
||||
go func(nodeToDelete *kube_api.Node) {
|
||||
confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder)
|
||||
}(node)
|
||||
}
|
||||
var finalError error
|
||||
for range emptyNodes {
|
||||
if err := <-confirmation; err != nil {
|
||||
glog.Errorf("Problem with empty node deletion: %v", err)
|
||||
finalError = err
|
||||
}
|
||||
}
|
||||
if finalError == nil {
|
||||
return ScaleDownNodeDeleted, nil
|
||||
}
|
||||
return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError)
|
||||
}
|
||||
|
||||
// We look for only 1 node so new hints may be incomplete.
|
||||
nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false,
|
||||
oldHints, usageTracker, time.Now())
|
||||
|
|
@ -183,23 +210,67 @@ func ScaleDown(
|
|||
glog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: ", toRemove.Node.Name, utilization,
|
||||
strings.Join(podNames, ","))
|
||||
|
||||
nodeGroup, err := cloudProvider.NodeGroupForNode(toRemove.Node)
|
||||
if err != nil {
|
||||
return ScaleDownError, fmt.Errorf("failed to node group for %s: %v", toRemove.Node.Name, err)
|
||||
}
|
||||
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
||||
return ScaleDownError, fmt.Errorf("picked node that doesn't belong to a node group: %s", toRemove.Node.Name)
|
||||
}
|
||||
|
||||
err = nodeGroup.DeleteNodes([]*kube_api.Node{toRemove.Node})
|
||||
simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes)
|
||||
|
||||
err = deleteNodeFromCloudProvider(toRemove.Node, cloudProvider, recorder)
|
||||
if err != nil {
|
||||
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
|
||||
}
|
||||
|
||||
recorder.Eventf(toRemove.Node, kube_api.EventTypeNormal, "ScaleDown",
|
||||
"node removed by cluster autoscaler")
|
||||
|
||||
return ScaleDownNodeDeleted, nil
|
||||
}
|
||||
|
||||
// This functions finds empty nodes among passed candidates and returns a list of empty nodes
|
||||
// that can be deleted at the same time.
|
||||
func getEmptyNodes(candidates []*kube_api.Node, pods []*kube_api.Pod, maxEmptyBulkDelete int, cloudProvider cloudprovider.CloudProvider) []*kube_api.Node {
|
||||
emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods)
|
||||
availabilityMap := make(map[string]int)
|
||||
result := make([]*kube_api.Node, 0)
|
||||
for _, node := range emptyNodes {
|
||||
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get group for %s", node.Name)
|
||||
continue
|
||||
}
|
||||
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
||||
continue
|
||||
}
|
||||
var available int
|
||||
var found bool
|
||||
if _, found = availabilityMap[nodeGroup.Id()]; !found {
|
||||
size, err := nodeGroup.TargetSize()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err)
|
||||
continue
|
||||
}
|
||||
available = size - nodeGroup.MinSize()
|
||||
if available < 0 {
|
||||
available = 0
|
||||
}
|
||||
availabilityMap[nodeGroup.Id()] = available
|
||||
}
|
||||
if available > 0 {
|
||||
available -= 1
|
||||
availabilityMap[nodeGroup.Id()] = available
|
||||
result = append(result, node)
|
||||
}
|
||||
}
|
||||
limit := maxEmptyBulkDelete
|
||||
if len(result) < limit {
|
||||
limit = len(result)
|
||||
}
|
||||
return result[:limit]
|
||||
}
|
||||
|
||||
func deleteNodeFromCloudProvider(node *kube_api.Node, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error {
|
||||
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to node group for %s: %v", node.Name, err)
|
||||
}
|
||||
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
||||
return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name)
|
||||
}
|
||||
if err = nodeGroup.DeleteNodes([]*kube_api.Node{node}); err != nil {
|
||||
return fmt.Errorf("failed to delete %s: %v", node.Name, err)
|
||||
}
|
||||
recorder.Eventf(node, kube_api.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler")
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,6 +111,30 @@ candidateloop:
|
|||
return result, newHints, nil
|
||||
}
|
||||
|
||||
// FindEmptyNodesToRemove finds empty nodes that can be removed.
|
||||
func FindEmptyNodesToRemove(candidates []*kube_api.Node, pods []*kube_api.Pod) []*kube_api.Node {
|
||||
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods)
|
||||
for _, node := range candidates {
|
||||
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
|
||||
nodeInfo.SetNode(node)
|
||||
}
|
||||
}
|
||||
result := make([]*kube_api.Node, 0)
|
||||
for _, node := range candidates {
|
||||
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
|
||||
// Should block on all pods.
|
||||
podsToRemove, err := FastGetPodsToMove(nodeInfo, true, true)
|
||||
if err == nil && len(podsToRemove) == 0 {
|
||||
result = append(result, node)
|
||||
}
|
||||
} else {
|
||||
// Node without pods.
|
||||
result = append(result, node)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// CalculateUtilization calculates utilization of a node, defined as total amount of requested resources divided by capacity.
|
||||
func CalculateUtilization(node *kube_api.Node, nodeInfo *schedulercache.NodeInfo) (float64, error) {
|
||||
cpu, err := calculateUtilizationOfResource(node, nodeInfo, kube_api.ResourceCPU)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
. "k8s.io/contrib/cluster-autoscaler/utils/test"
|
||||
|
||||
kube_api "k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -150,3 +151,21 @@ func TestShuffleNodes(t *testing.T) {
|
|||
}
|
||||
assert.True(t, gotPermutation)
|
||||
}
|
||||
|
||||
func TestFindEmptyNodes(t *testing.T) {
|
||||
pod1 := BuildTestPod("p1", 300, 500000)
|
||||
pod1.Spec.NodeName = "n1"
|
||||
pod2 := BuildTestPod("p2", 300, 500000)
|
||||
pod2.Spec.NodeName = "n2"
|
||||
pod2.Annotations = map[string]string{
|
||||
types.ConfigMirrorAnnotationKey: "",
|
||||
}
|
||||
|
||||
node1 := BuildTestNode("n1", 1000, 2000000)
|
||||
node2 := BuildTestNode("n2", 1000, 2000000)
|
||||
node3 := BuildTestNode("n3", 1000, 2000000)
|
||||
node4 := BuildTestNode("n4", 1000, 2000000)
|
||||
|
||||
emptyNodes := FindEmptyNodesToRemove([]*kube_api.Node{node1, node2, node3, node4}, []*kube_api.Pod{pod1, pod2})
|
||||
assert.Equal(t, []*kube_api.Node{node2, node3, node4}, emptyNodes)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue