ClusterAutoscaler: context object to keep non-changing objects

This commit is contained in:
Marcin Wielgus 2016-12-16 12:15:11 +01:00
parent d5d82b3276
commit df5bc303b7
5 changed files with 101 additions and 73 deletions

View File

@ -81,10 +81,11 @@ var (
"Node utilization level, defined as sum of requested resources divided by capacity, below which a node can be considered for scale down")
scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute,
"How often scale down possiblity is check")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
maxGratefulTerminationFlag = flag.Int("max-grateful-termination-sec", 60, "Maximum number of seconds CA waints for pod termination when trying to scale down a node.")
// AvailableEstimators is a list of available estimators.
AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName}
@ -134,8 +135,6 @@ func run(_ <-chan struct{}) {
nodeUtilizationMap := make(map[string]float64)
usageTracker := simulator.NewUsageTracker()
recorder := createEventRecorder(kubeClient)
var cloudProvider cloudprovider.CloudProvider
if *cloudProviderFlag == "gce" {
@ -183,6 +182,19 @@ func run(_ <-chan struct{}) {
}
}
autoscalingContext := AutoscalingContext{
CloudProvider: cloudProvider,
ClientSet: kubeClient,
Recorder: createEventRecorder(kubeClient),
PredicateChecker: predicateChecker,
MaxEmptyBulkDelete: *maxEmptyBulkDeleteFlag,
ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold,
ScaleDownUnneededTime: *scaleDownUnneededTime,
MaxNodesTotal: *maxNodesTotal,
EstimatorName: *estimatorFlag,
MaxGratefulTerminationSec: *maxGratefulTerminationFlag,
}
for {
select {
case <-time.After(*scanInterval):
@ -200,13 +212,13 @@ func run(_ <-chan struct{}) {
continue
}
if err := CheckGroupsAndNodes(nodes, cloudProvider); err != nil {
if err := CheckGroupsAndNodes(nodes, autoscalingContext.CloudProvider); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue
}
// CA can die at any time. Removing taints that might have been left from the previous run.
if err := cleanToBeDeleted(nodes, kubeClient, recorder); err != nil {
if err := cleanToBeDeleted(nodes, kubeClient, autoscalingContext.Recorder); err != nil {
glog.Warningf("Failed to clean ToBeDeleted information: %v", err)
continue
}
@ -227,7 +239,7 @@ func run(_ <-chan struct{}) {
// the newest node became available for the scheduler.
allNodesAvailableTime := GetAllNodesAvailableTime(nodes)
podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime)
ResetPodScheduledCondition(kubeClient, podsToReset)
ResetPodScheduledCondition(autoscalingContext.ClientSet, podsToReset)
// We need to check whether pods marked as unschedulable are actually unschedulable.
// This should prevent from adding unnecessary nodes. Example of such situation:
@ -245,7 +257,8 @@ func run(_ <-chan struct{}) {
// in the describe situation.
schedulablePodsPresent := false
if *verifyUnschedulablePods {
newUnschedulablePodsToHelp := FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled, predicateChecker)
newUnschedulablePodsToHelp := FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled,
autoscalingContext.PredicateChecker)
if len(newUnschedulablePodsToHelp) != len(unschedulablePodsToHelp) {
glog.V(2).Info("Schedulable pods present")
@ -261,9 +274,7 @@ func run(_ <-chan struct{}) {
} else {
scaleUpStart := time.Now()
updateLastTime("scaleup")
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, cloudProvider, kubeClient, predicateChecker, recorder,
*maxNodesTotal, *estimatorFlag)
scaledUp, err := ScaleUp(autoscalingContext, unschedulablePodsToHelp, nodes)
updateDuration("scaleup", scaleUpStart)
if err != nil {
@ -295,11 +306,10 @@ func run(_ <-chan struct{}) {
usageTracker.CleanUp(time.Now().Add(-(*scaleDownUnneededTime)))
unneededNodes, podLocationHints, nodeUtilizationMap = FindUnneededNodes(
autoscalingContext,
nodes,
unneededNodes,
*scaleDownUtilizationThreshold,
allScheduled,
predicateChecker,
podLocationHints,
usageTracker, time.Now())
@ -318,18 +328,13 @@ func run(_ <-chan struct{}) {
updateLastTime("scaledown")
result, err := ScaleDown(
autoscalingContext,
nodes,
nodeUtilizationMap,
unneededNodes,
*scaleDownUnneededTime,
allScheduled,
cloudProvider,
kubeClient,
predicateChecker,
podLocationHints,
usageTracker,
recorder,
*maxEmptyBulkDeleteFlag)
usageTracker)
updateDuration("scaledown", scaleDownStart)

View File

@ -51,19 +51,16 @@ const (
const (
// ToBeDeletedTaint is a taint used to make the node unschedulable.
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
// MaxGracefulTerminationTime is max gracefull termination time used by CA.
MaxGracefulTerminationTime = time.Minute
)
// FindUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else,
// and updates unneededNodes map accordingly. It also returns information where pods can be rescheduld and
// node utilization level.
func FindUnneededNodes(nodes []*apiv1.Node,
func FindUnneededNodes(
context AutoscalingContext,
nodes []*apiv1.Node,
unneededNodes map[string]time.Time,
utilizationThreshold float64,
pods []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker,
oldHints map[string]string,
tracker *simulator.UsageTracker,
timestamp time.Time) (unnededTimeMap map[string]time.Time, podReschedulingHints map[string]string, utilizationMap map[string]float64) {
@ -87,7 +84,7 @@ func FindUnneededNodes(nodes []*apiv1.Node,
glog.V(4).Infof("Node %s - utilization %f", node.Name, utilization)
utilizationMap[node.Name] = utilization
if utilization >= utilizationThreshold {
if utilization >= context.ScaleDownUtilizationThreshold {
glog.V(4).Infof("Node %s is not suitable for removal - utilization too big (%f)", node.Name, utilization)
continue
}
@ -96,7 +93,7 @@ func FindUnneededNodes(nodes []*apiv1.Node,
// Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, predicateChecker,
nil, context.PredicateChecker,
len(currentlyUnneededNodes), true, oldHints, tracker, timestamp)
if err != nil {
glog.Errorf("Error while simulating node drains: %v", err)
@ -120,18 +117,14 @@ func FindUnneededNodes(nodes []*apiv1.Node,
// ScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occured.
func ScaleDown(
context AutoscalingContext,
nodes []*apiv1.Node,
lastUtilizationMap map[string]float64,
unneededNodes map[string]time.Time,
unneededTime time.Duration,
pods []*apiv1.Pod,
cloudProvider cloudprovider.CloudProvider,
client kube_client.Interface,
predicateChecker *simulator.PredicateChecker,
oldHints map[string]string,
usageTracker *simulator.UsageTracker,
recorder kube_record.EventRecorder,
maxEmptyBulkDelete int) (ScaleDownResult, error) {
) (ScaleDownResult, error) {
now := time.Now()
candidates := make([]*apiv1.Node, 0)
@ -141,11 +134,11 @@ func ScaleDown(
glog.V(2).Infof("%s was unneeded for %s", node.Name, now.Sub(val).String())
// Check how long the node was underutilized.
if !val.Add(unneededTime).Before(now) {
if !val.Add(context.ScaleDownUnneededTime).Before(now) {
continue
}
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
nodeGroup, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil {
glog.Errorf("Error while checking node group for %s: %v", node.Name, err)
continue
@ -177,14 +170,14 @@ func ScaleDown(
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
// try to delete not-so-empty nodes, possibly killing some pods and allowing them
// to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, maxEmptyBulkDelete, cloudProvider)
emptyNodes := getEmptyNodes(candidates, pods, context.MaxEmptyBulkDelete, context.CloudProvider)
if len(emptyNodes) > 0 {
confirmation := make(chan error, len(emptyNodes))
for _, node := range emptyNodes {
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes)
go func(nodeToDelete *apiv1.Node) {
confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder)
confirmation <- deleteNodeFromCloudProvider(nodeToDelete, context.CloudProvider, context.Recorder)
}(node)
}
var finalError error
@ -201,7 +194,8 @@ func ScaleDown(
}
// We look for only 1 node so new hints may be incomplete.
nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false,
nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, context.ClientSet,
context.PredicateChecker, 1, false,
oldHints, usageTracker, time.Now())
if err != nil {
@ -222,8 +216,7 @@ func ScaleDown(
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes)
err = deleteNode(toRemove.Node, toRemove.PodsToReschedule, client, cloudProvider, recorder)
err = deleteNode(context, toRemove.Node, toRemove.PodsToReschedule)
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
}
@ -273,25 +266,26 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit]
}
func deleteNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error {
if err := drainNode(node, pods, client, recorder); err != nil {
func deleteNode(context AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error {
if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec); err != nil {
return err
}
return deleteNodeFromCloudProvider(node, cloudProvider, recorder)
return deleteNodeFromCloudProvider(node, context.CloudProvider, context.Recorder)
}
// Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
// them up to MaxGracefulTerminationTime to finish.
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder) error {
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGratefulTerminationSec int) error {
if err := markToBeDeleted(node, client, recorder); err != nil {
return err
}
seconds := int64(MaxGracefulTerminationTime.Seconds())
maxGraceful64 := int64(maxGratefulTerminationSec)
for _, pod := range pods {
recorder.Eventf(pod, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")
err := client.Core().Pods(pod.Namespace).Delete(pod.Name, &apiv1.DeleteOptions{
GracePeriodSeconds: &seconds,
GracePeriodSeconds: &maxGraceful64,
})
if err != nil {
glog.Errorf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err)
@ -300,7 +294,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
allGone := true
// Wait up to MaxGracefulTerminationTime.
for start := time.Now(); time.Now().Sub(start) < MaxGracefulTerminationTime; time.Sleep(5 * time.Second) {
for start := time.Now(); time.Now().Sub(start) < time.Duration(maxGratefulTerminationSec)*time.Second; time.Sleep(5 * time.Second) {
allGone = true
for _, pod := range pods {
podreturned, err := client.Core().Pods(pod.Namespace).Get(pod.Name)

View File

@ -59,8 +59,13 @@ func TestFindUnneededNodes(t *testing.T) {
n3 := BuildTestNode("n3", 1000, 10)
n4 := BuildTestNode("n4", 10000, 10)
result, hints, utilization := FindUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, map[string]time.Time{}, 0.35,
[]*apiv1.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), make(map[string]string),
context := AutoscalingContext{
PredicateChecker: simulator.NewTestPredicateChecker(),
ScaleDownUtilizationThreshold: 0.35,
}
result, hints, utilization := FindUnneededNodes(context, []*apiv1.Node{n1, n2, n3, n4}, map[string]time.Time{},
[]*apiv1.Pod{p1, p2, p3, p4}, make(map[string]string),
simulator.NewUsageTracker(), time.Now())
assert.Equal(t, 1, len(result))
@ -70,8 +75,8 @@ func TestFindUnneededNodes(t *testing.T) {
assert.Equal(t, 4, len(utilization))
result["n1"] = time.Now()
result2, hints, utilization := FindUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, result, 0.35,
[]*apiv1.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), hints,
result2, hints, utilization := FindUnneededNodes(context, []*apiv1.Node{n1, n2, n3, n4}, result,
[]*apiv1.Pod{p1, p2, p3, p4}, hints,
simulator.NewUsageTracker(), time.Now())
assert.Equal(t, 1, len(result2))
@ -110,7 +115,7 @@ func TestDrainNode(t *testing.T) {
updatedNodes <- obj.Name
return true, obj, nil
})
err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, createEventRecorder(fakeClient))
err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, createEventRecorder(fakeClient), 20)
assert.NoError(t, err)
assert.Equal(t, p1.Name, getStringFromChan(deletedPods))
assert.Equal(t, p2.Name, getStringFromChan(deletedPods))

View File

@ -21,10 +21,7 @@ import (
"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/estimator"
"k8s.io/contrib/cluster-autoscaler/simulator"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
kube_record "k8s.io/kubernetes/pkg/client/record"
"github.com/golang/glog"
)
@ -40,9 +37,7 @@ type ExpansionOption struct {
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occured. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface,
predicateChecker *simulator.PredicateChecker, recorder kube_record.EventRecorder, maxNodesTotal int,
estimatorName string) (bool, error) {
func ScaleUp(context AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node) (bool, error) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
@ -56,13 +51,13 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
}
expansionOptions := make([]ExpansionOption, 0)
nodeInfos, err := GetNodeInfosForGroups(nodes, cloudProvider, kubeClient)
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet)
if err != nil {
return false, fmt.Errorf("failed to build node infos for node groups: %v", err)
}
podsRemainUnshedulable := make(map[*apiv1.Pod]struct{})
for _, nodeGroup := range cloudProvider.NodeGroups() {
for _, nodeGroup := range context.CloudProvider.NodeGroups() {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
@ -87,7 +82,7 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
}
for _, pod := range unschedulablePods {
err = predicateChecker.CheckPredicates(pod, nodeInfo)
err = context.PredicateChecker.CheckPredicates(pod, nodeInfo)
if err == nil {
option.pods = append(option.pods, pod)
} else {
@ -96,17 +91,17 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
}
}
if len(option.pods) > 0 {
if estimatorName == BinpackingEstimatorName {
binpackingEstimator := estimator.NewBinpackingNodeEstimator(predicateChecker)
if context.EstimatorName == BinpackingEstimatorName {
binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker)
option.nodeCount = binpackingEstimator.Estimate(option.pods, nodeInfo)
} else if estimatorName == BasicEstimatorName {
} else if context.EstimatorName == BasicEstimatorName {
basicEstimator := estimator.NewBasicNodeEstimator()
for _, pod := range option.pods {
basicEstimator.Add(pod)
}
option.nodeCount, option.debug = basicEstimator.Estimate(nodeInfo.Node())
} else {
glog.Fatalf("Unrecognized estimator: %s", estimatorName)
glog.Fatalf("Unrecognized estimator: %s", context.EstimatorName)
}
expansionOptions = append(expansionOptions, option)
}
@ -131,9 +126,9 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
newSize = bestOption.nodeGroup.MaxSize()
}
if maxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > maxNodesTotal {
glog.V(1).Infof("Capping size to max cluster total size (%d)", maxNodesTotal)
newSize = maxNodesTotal - len(nodes) + currentSize
if context.MaxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > context.MaxNodesTotal {
glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newSize = context.MaxNodesTotal - len(nodes) + currentSize
if newSize < currentSize {
return false, fmt.Errorf("max node total count already reached")
}
@ -146,14 +141,14 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
}
for _, pod := range bestOption.pods {
recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp",
context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.nodeGroup.Id(), currentSize, newSize)
}
return true, nil
}
for pod := range podsRemainUnshedulable {
recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp",
context.Recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp",
"pod didn't trigger scale-up (it wouldn't fit if a new node is added)")
}

View File

@ -28,11 +28,40 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
kube_record "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
// AutoscalingContext contains user-configurable constant and configuration-related objects passed to
// scale up/scale down functions.
type AutoscalingContext struct {
// CloudProvider used in CA.
CloudProvider cloudprovider.CloudProvider
// ClientSet interface.
ClientSet kube_client.Interface
// Recorder for fecording events.
Recorder kube_record.EventRecorder
// PredicateChecker to check if a pod can fit into a node.
PredicateChecker *simulator.PredicateChecker
// MaxEmptyBulkDelete is a number of empty nodes that can be removed at the same time.
MaxEmptyBulkDelete int
// ScaleDownUtilizationThreshold sets threshould for nodes to be considered for scale down.
// Well-utilized nodes are not touched.
ScaleDownUtilizationThreshold float64
// ScaleDownUnneededTime sets the duriation CA exepects a node to be unneded/eligible for removal
// before scaling down the node.
ScaleDownUnneededTime time.Duration
// MaxNodesTotal sets the maximum number of nodes in the whole cluster
MaxNodesTotal int
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
EstimatorName string
// MaxGratefulTerminationSec is maximum number of seconds scale down waits for pods to terminante before
// removing the node from cloud provider.
MaxGratefulTerminationSec int
}
// GetAllNodesAvailableTime returns time when the newest node became available for scheduler.
// TODO: This function should use LastTransitionTime from NodeReady condition.
func GetAllNodesAvailableTime(nodes []*apiv1.Node) time.Time {