From df5bc303b74d2c20dd39b0d4b94a6e667e07cc8d Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 16 Dec 2016 12:15:11 +0100 Subject: [PATCH] ClusterAutoscaler: context object to keep non-changing objects --- cluster-autoscaler/cluster_autoscaler.go | 49 ++++++++++++----------- cluster-autoscaler/scale_down.go | 50 +++++++++++------------- cluster-autoscaler/scale_down_test.go | 15 ++++--- cluster-autoscaler/scale_up.go | 31 ++++++--------- cluster-autoscaler/utils.go | 29 ++++++++++++++ 5 files changed, 101 insertions(+), 73 deletions(-) diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index 08b8f28692..96b53d6d0e 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -81,10 +81,11 @@ var ( "Node utilization level, defined as sum of requested resources divided by capacity, below which a node can be considered for scale down") scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute, "How often scale down possiblity is check") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") - maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") - cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") - maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") + scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") + cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") + maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") + maxGratefulTerminationFlag = flag.Int("max-grateful-termination-sec", 60, "Maximum number of seconds CA waints for pod termination when trying to scale down a node.") // AvailableEstimators is a list of available estimators. AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName} @@ -134,8 +135,6 @@ func run(_ <-chan struct{}) { nodeUtilizationMap := make(map[string]float64) usageTracker := simulator.NewUsageTracker() - recorder := createEventRecorder(kubeClient) - var cloudProvider cloudprovider.CloudProvider if *cloudProviderFlag == "gce" { @@ -183,6 +182,19 @@ func run(_ <-chan struct{}) { } } + autoscalingContext := AutoscalingContext{ + CloudProvider: cloudProvider, + ClientSet: kubeClient, + Recorder: createEventRecorder(kubeClient), + PredicateChecker: predicateChecker, + MaxEmptyBulkDelete: *maxEmptyBulkDeleteFlag, + ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold, + ScaleDownUnneededTime: *scaleDownUnneededTime, + MaxNodesTotal: *maxNodesTotal, + EstimatorName: *estimatorFlag, + MaxGratefulTerminationSec: *maxGratefulTerminationFlag, + } + for { select { case <-time.After(*scanInterval): @@ -200,13 +212,13 @@ func run(_ <-chan struct{}) { continue } - if err := CheckGroupsAndNodes(nodes, cloudProvider); err != nil { + if err := CheckGroupsAndNodes(nodes, autoscalingContext.CloudProvider); err != nil { glog.Warningf("Cluster is not ready for autoscaling: %v", err) continue } // CA can die at any time. Removing taints that might have been left from the previous run. - if err := cleanToBeDeleted(nodes, kubeClient, recorder); err != nil { + if err := cleanToBeDeleted(nodes, kubeClient, autoscalingContext.Recorder); err != nil { glog.Warningf("Failed to clean ToBeDeleted information: %v", err) continue } @@ -227,7 +239,7 @@ func run(_ <-chan struct{}) { // the newest node became available for the scheduler. allNodesAvailableTime := GetAllNodesAvailableTime(nodes) podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime) - ResetPodScheduledCondition(kubeClient, podsToReset) + ResetPodScheduledCondition(autoscalingContext.ClientSet, podsToReset) // We need to check whether pods marked as unschedulable are actually unschedulable. // This should prevent from adding unnecessary nodes. Example of such situation: @@ -245,7 +257,8 @@ func run(_ <-chan struct{}) { // in the describe situation. schedulablePodsPresent := false if *verifyUnschedulablePods { - newUnschedulablePodsToHelp := FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled, predicateChecker) + newUnschedulablePodsToHelp := FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled, + autoscalingContext.PredicateChecker) if len(newUnschedulablePodsToHelp) != len(unschedulablePodsToHelp) { glog.V(2).Info("Schedulable pods present") @@ -261,9 +274,7 @@ func run(_ <-chan struct{}) { } else { scaleUpStart := time.Now() updateLastTime("scaleup") - scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, cloudProvider, kubeClient, predicateChecker, recorder, - *maxNodesTotal, *estimatorFlag) - + scaledUp, err := ScaleUp(autoscalingContext, unschedulablePodsToHelp, nodes) updateDuration("scaleup", scaleUpStart) if err != nil { @@ -295,11 +306,10 @@ func run(_ <-chan struct{}) { usageTracker.CleanUp(time.Now().Add(-(*scaleDownUnneededTime))) unneededNodes, podLocationHints, nodeUtilizationMap = FindUnneededNodes( + autoscalingContext, nodes, unneededNodes, - *scaleDownUtilizationThreshold, allScheduled, - predicateChecker, podLocationHints, usageTracker, time.Now()) @@ -318,18 +328,13 @@ func run(_ <-chan struct{}) { updateLastTime("scaledown") result, err := ScaleDown( + autoscalingContext, nodes, nodeUtilizationMap, unneededNodes, - *scaleDownUnneededTime, allScheduled, - cloudProvider, - kubeClient, - predicateChecker, podLocationHints, - usageTracker, - recorder, - *maxEmptyBulkDeleteFlag) + usageTracker) updateDuration("scaledown", scaleDownStart) diff --git a/cluster-autoscaler/scale_down.go b/cluster-autoscaler/scale_down.go index b1f8de182a..8004a66996 100644 --- a/cluster-autoscaler/scale_down.go +++ b/cluster-autoscaler/scale_down.go @@ -51,19 +51,16 @@ const ( const ( // ToBeDeletedTaint is a taint used to make the node unschedulable. ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" - - // MaxGracefulTerminationTime is max gracefull termination time used by CA. - MaxGracefulTerminationTime = time.Minute ) // FindUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else, // and updates unneededNodes map accordingly. It also returns information where pods can be rescheduld and // node utilization level. -func FindUnneededNodes(nodes []*apiv1.Node, +func FindUnneededNodes( + context AutoscalingContext, + nodes []*apiv1.Node, unneededNodes map[string]time.Time, - utilizationThreshold float64, pods []*apiv1.Pod, - predicateChecker *simulator.PredicateChecker, oldHints map[string]string, tracker *simulator.UsageTracker, timestamp time.Time) (unnededTimeMap map[string]time.Time, podReschedulingHints map[string]string, utilizationMap map[string]float64) { @@ -87,7 +84,7 @@ func FindUnneededNodes(nodes []*apiv1.Node, glog.V(4).Infof("Node %s - utilization %f", node.Name, utilization) utilizationMap[node.Name] = utilization - if utilization >= utilizationThreshold { + if utilization >= context.ScaleDownUtilizationThreshold { glog.V(4).Infof("Node %s is not suitable for removal - utilization too big (%f)", node.Name, utilization) continue } @@ -96,7 +93,7 @@ func FindUnneededNodes(nodes []*apiv1.Node, // Phase2 - check which nodes can be probably removed using fast drain. nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods, - nil, predicateChecker, + nil, context.PredicateChecker, len(currentlyUnneededNodes), true, oldHints, tracker, timestamp) if err != nil { glog.Errorf("Error while simulating node drains: %v", err) @@ -120,18 +117,14 @@ func FindUnneededNodes(nodes []*apiv1.Node, // ScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was // removed and error if such occured. func ScaleDown( + context AutoscalingContext, nodes []*apiv1.Node, lastUtilizationMap map[string]float64, unneededNodes map[string]time.Time, - unneededTime time.Duration, pods []*apiv1.Pod, - cloudProvider cloudprovider.CloudProvider, - client kube_client.Interface, - predicateChecker *simulator.PredicateChecker, oldHints map[string]string, usageTracker *simulator.UsageTracker, - recorder kube_record.EventRecorder, - maxEmptyBulkDelete int) (ScaleDownResult, error) { +) (ScaleDownResult, error) { now := time.Now() candidates := make([]*apiv1.Node, 0) @@ -141,11 +134,11 @@ func ScaleDown( glog.V(2).Infof("%s was unneeded for %s", node.Name, now.Sub(val).String()) // Check how long the node was underutilized. - if !val.Add(unneededTime).Before(now) { + if !val.Add(context.ScaleDownUnneededTime).Before(now) { continue } - nodeGroup, err := cloudProvider.NodeGroupForNode(node) + nodeGroup, err := context.CloudProvider.NodeGroupForNode(node) if err != nil { glog.Errorf("Error while checking node group for %s: %v", node.Name, err) continue @@ -177,14 +170,14 @@ func ScaleDown( // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will // try to delete not-so-empty nodes, possibly killing some pods and allowing them // to recreate on other nodes. - emptyNodes := getEmptyNodes(candidates, pods, maxEmptyBulkDelete, cloudProvider) + emptyNodes := getEmptyNodes(candidates, pods, context.MaxEmptyBulkDelete, context.CloudProvider) if len(emptyNodes) > 0 { confirmation := make(chan error, len(emptyNodes)) for _, node := range emptyNodes { glog.V(0).Infof("Scale-down: removing empty node %s", node.Name) simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes) go func(nodeToDelete *apiv1.Node) { - confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder) + confirmation <- deleteNodeFromCloudProvider(nodeToDelete, context.CloudProvider, context.Recorder) }(node) } var finalError error @@ -201,7 +194,8 @@ func ScaleDown( } // We look for only 1 node so new hints may be incomplete. - nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false, + nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, context.ClientSet, + context.PredicateChecker, 1, false, oldHints, usageTracker, time.Now()) if err != nil { @@ -222,8 +216,7 @@ func ScaleDown( // Nothing super-bad should happen if the node is removed from tracker prematurely. simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes) - - err = deleteNode(toRemove.Node, toRemove.PodsToReschedule, client, cloudProvider, recorder) + err = deleteNode(context, toRemove.Node, toRemove.PodsToReschedule) if err != nil { return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err) } @@ -273,25 +266,26 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele return result[:limit] } -func deleteNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error { - if err := drainNode(node, pods, client, recorder); err != nil { +func deleteNode(context AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error { + if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec); err != nil { return err } - return deleteNodeFromCloudProvider(node, cloudProvider, recorder) + return deleteNodeFromCloudProvider(node, context.CloudProvider, context.Recorder) } // Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving // them up to MaxGracefulTerminationTime to finish. -func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder) error { +func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder, + maxGratefulTerminationSec int) error { if err := markToBeDeleted(node, client, recorder); err != nil { return err } - seconds := int64(MaxGracefulTerminationTime.Seconds()) + maxGraceful64 := int64(maxGratefulTerminationSec) for _, pod := range pods { recorder.Eventf(pod, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down") err := client.Core().Pods(pod.Namespace).Delete(pod.Name, &apiv1.DeleteOptions{ - GracePeriodSeconds: &seconds, + GracePeriodSeconds: &maxGraceful64, }) if err != nil { glog.Errorf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err) @@ -300,7 +294,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface allGone := true // Wait up to MaxGracefulTerminationTime. - for start := time.Now(); time.Now().Sub(start) < MaxGracefulTerminationTime; time.Sleep(5 * time.Second) { + for start := time.Now(); time.Now().Sub(start) < time.Duration(maxGratefulTerminationSec)*time.Second; time.Sleep(5 * time.Second) { allGone = true for _, pod := range pods { podreturned, err := client.Core().Pods(pod.Namespace).Get(pod.Name) diff --git a/cluster-autoscaler/scale_down_test.go b/cluster-autoscaler/scale_down_test.go index d01e453013..c681892907 100644 --- a/cluster-autoscaler/scale_down_test.go +++ b/cluster-autoscaler/scale_down_test.go @@ -59,8 +59,13 @@ func TestFindUnneededNodes(t *testing.T) { n3 := BuildTestNode("n3", 1000, 10) n4 := BuildTestNode("n4", 10000, 10) - result, hints, utilization := FindUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, map[string]time.Time{}, 0.35, - []*apiv1.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), make(map[string]string), + context := AutoscalingContext{ + PredicateChecker: simulator.NewTestPredicateChecker(), + ScaleDownUtilizationThreshold: 0.35, + } + + result, hints, utilization := FindUnneededNodes(context, []*apiv1.Node{n1, n2, n3, n4}, map[string]time.Time{}, + []*apiv1.Pod{p1, p2, p3, p4}, make(map[string]string), simulator.NewUsageTracker(), time.Now()) assert.Equal(t, 1, len(result)) @@ -70,8 +75,8 @@ func TestFindUnneededNodes(t *testing.T) { assert.Equal(t, 4, len(utilization)) result["n1"] = time.Now() - result2, hints, utilization := FindUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, result, 0.35, - []*apiv1.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), hints, + result2, hints, utilization := FindUnneededNodes(context, []*apiv1.Node{n1, n2, n3, n4}, result, + []*apiv1.Pod{p1, p2, p3, p4}, hints, simulator.NewUsageTracker(), time.Now()) assert.Equal(t, 1, len(result2)) @@ -110,7 +115,7 @@ func TestDrainNode(t *testing.T) { updatedNodes <- obj.Name return true, obj, nil }) - err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, createEventRecorder(fakeClient)) + err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, createEventRecorder(fakeClient), 20) assert.NoError(t, err) assert.Equal(t, p1.Name, getStringFromChan(deletedPods)) assert.Equal(t, p2.Name, getStringFromChan(deletedPods)) diff --git a/cluster-autoscaler/scale_up.go b/cluster-autoscaler/scale_up.go index 0e4d70af0f..64e4d9b088 100644 --- a/cluster-autoscaler/scale_up.go +++ b/cluster-autoscaler/scale_up.go @@ -21,10 +21,7 @@ import ( "k8s.io/contrib/cluster-autoscaler/cloudprovider" "k8s.io/contrib/cluster-autoscaler/estimator" - "k8s.io/contrib/cluster-autoscaler/simulator" apiv1 "k8s.io/kubernetes/pkg/api/v1" - kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" - kube_record "k8s.io/kubernetes/pkg/client/record" "github.com/golang/glog" ) @@ -40,9 +37,7 @@ type ExpansionOption struct { // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // false if it didn't and error if an error occured. Assumes that all nodes in the cluster are // ready and in sync with instance groups. -func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface, - predicateChecker *simulator.PredicateChecker, recorder kube_record.EventRecorder, maxNodesTotal int, - estimatorName string) (bool, error) { +func ScaleUp(context AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node) (bool, error) { // From now on we only care about unschedulable pods that were marked after the newest // node became available for the scheduler. @@ -56,13 +51,13 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider } expansionOptions := make([]ExpansionOption, 0) - nodeInfos, err := GetNodeInfosForGroups(nodes, cloudProvider, kubeClient) + nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet) if err != nil { return false, fmt.Errorf("failed to build node infos for node groups: %v", err) } podsRemainUnshedulable := make(map[*apiv1.Pod]struct{}) - for _, nodeGroup := range cloudProvider.NodeGroups() { + for _, nodeGroup := range context.CloudProvider.NodeGroups() { currentSize, err := nodeGroup.TargetSize() if err != nil { @@ -87,7 +82,7 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider } for _, pod := range unschedulablePods { - err = predicateChecker.CheckPredicates(pod, nodeInfo) + err = context.PredicateChecker.CheckPredicates(pod, nodeInfo) if err == nil { option.pods = append(option.pods, pod) } else { @@ -96,17 +91,17 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider } } if len(option.pods) > 0 { - if estimatorName == BinpackingEstimatorName { - binpackingEstimator := estimator.NewBinpackingNodeEstimator(predicateChecker) + if context.EstimatorName == BinpackingEstimatorName { + binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker) option.nodeCount = binpackingEstimator.Estimate(option.pods, nodeInfo) - } else if estimatorName == BasicEstimatorName { + } else if context.EstimatorName == BasicEstimatorName { basicEstimator := estimator.NewBasicNodeEstimator() for _, pod := range option.pods { basicEstimator.Add(pod) } option.nodeCount, option.debug = basicEstimator.Estimate(nodeInfo.Node()) } else { - glog.Fatalf("Unrecognized estimator: %s", estimatorName) + glog.Fatalf("Unrecognized estimator: %s", context.EstimatorName) } expansionOptions = append(expansionOptions, option) } @@ -131,9 +126,9 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider newSize = bestOption.nodeGroup.MaxSize() } - if maxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > maxNodesTotal { - glog.V(1).Infof("Capping size to max cluster total size (%d)", maxNodesTotal) - newSize = maxNodesTotal - len(nodes) + currentSize + if context.MaxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > context.MaxNodesTotal { + glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) + newSize = context.MaxNodesTotal - len(nodes) + currentSize if newSize < currentSize { return false, fmt.Errorf("max node total count already reached") } @@ -146,14 +141,14 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider } for _, pod := range bestOption.pods { - recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp", + context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp", "pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.nodeGroup.Id(), currentSize, newSize) } return true, nil } for pod := range podsRemainUnshedulable { - recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp", + context.Recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp", "pod didn't trigger scale-up (it wouldn't fit if a new node is added)") } diff --git a/cluster-autoscaler/utils.go b/cluster-autoscaler/utils.go index 22095132b4..7cba65c9de 100644 --- a/cluster-autoscaler/utils.go +++ b/cluster-autoscaler/utils.go @@ -28,11 +28,40 @@ import ( apiv1 "k8s.io/kubernetes/pkg/api/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + kube_record "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "github.com/golang/glog" ) +// AutoscalingContext contains user-configurable constant and configuration-related objects passed to +// scale up/scale down functions. +type AutoscalingContext struct { + // CloudProvider used in CA. + CloudProvider cloudprovider.CloudProvider + // ClientSet interface. + ClientSet kube_client.Interface + // Recorder for fecording events. + Recorder kube_record.EventRecorder + // PredicateChecker to check if a pod can fit into a node. + PredicateChecker *simulator.PredicateChecker + // MaxEmptyBulkDelete is a number of empty nodes that can be removed at the same time. + MaxEmptyBulkDelete int + // ScaleDownUtilizationThreshold sets threshould for nodes to be considered for scale down. + // Well-utilized nodes are not touched. + ScaleDownUtilizationThreshold float64 + // ScaleDownUnneededTime sets the duriation CA exepects a node to be unneded/eligible for removal + // before scaling down the node. + ScaleDownUnneededTime time.Duration + // MaxNodesTotal sets the maximum number of nodes in the whole cluster + MaxNodesTotal int + // EstimatorName is the estimator used to estimate the number of needed nodes in scale up. + EstimatorName string + // MaxGratefulTerminationSec is maximum number of seconds scale down waits for pods to terminante before + // removing the node from cloud provider. + MaxGratefulTerminationSec int +} + // GetAllNodesAvailableTime returns time when the newest node became available for scheduler. // TODO: This function should use LastTransitionTime from NodeReady condition. func GetAllNodesAvailableTime(nodes []*apiv1.Node) time.Time {