Merge pull request https://github.com/kubernetes/contrib/pull/2161 from mwielgus/context

Automatic merge from submit-queue

ClusterAutoscaler: context object to keep non-changing objects

+ flag with max graceful termination for scale down.

cc: @fgrzadkowski @piosz @jszepkowski
This commit is contained in:
Kubernetes Submit Queue 2016-12-16 08:37:36 -08:00 committed by GitHub
commit 979c64fe84
5 changed files with 101 additions and 73 deletions

View File

@ -85,6 +85,7 @@ var (
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws") cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws")
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
maxGratefulTerminationFlag = flag.Int("max-grateful-termination-sec", 60, "Maximum number of seconds CA waints for pod termination when trying to scale down a node.")
// AvailableEstimators is a list of available estimators. // AvailableEstimators is a list of available estimators.
AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName} AvailableEstimators = []string{BasicEstimatorName, BinpackingEstimatorName}
@ -134,8 +135,6 @@ func run(_ <-chan struct{}) {
nodeUtilizationMap := make(map[string]float64) nodeUtilizationMap := make(map[string]float64)
usageTracker := simulator.NewUsageTracker() usageTracker := simulator.NewUsageTracker()
recorder := createEventRecorder(kubeClient)
var cloudProvider cloudprovider.CloudProvider var cloudProvider cloudprovider.CloudProvider
if *cloudProviderFlag == "gce" { if *cloudProviderFlag == "gce" {
@ -183,6 +182,19 @@ func run(_ <-chan struct{}) {
} }
} }
autoscalingContext := AutoscalingContext{
CloudProvider: cloudProvider,
ClientSet: kubeClient,
Recorder: createEventRecorder(kubeClient),
PredicateChecker: predicateChecker,
MaxEmptyBulkDelete: *maxEmptyBulkDeleteFlag,
ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold,
ScaleDownUnneededTime: *scaleDownUnneededTime,
MaxNodesTotal: *maxNodesTotal,
EstimatorName: *estimatorFlag,
MaxGratefulTerminationSec: *maxGratefulTerminationFlag,
}
for { for {
select { select {
case <-time.After(*scanInterval): case <-time.After(*scanInterval):
@ -200,13 +212,13 @@ func run(_ <-chan struct{}) {
continue continue
} }
if err := CheckGroupsAndNodes(nodes, cloudProvider); err != nil { if err := CheckGroupsAndNodes(nodes, autoscalingContext.CloudProvider); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err) glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue continue
} }
// CA can die at any time. Removing taints that might have been left from the previous run. // CA can die at any time. Removing taints that might have been left from the previous run.
if err := cleanToBeDeleted(nodes, kubeClient, recorder); err != nil { if err := cleanToBeDeleted(nodes, kubeClient, autoscalingContext.Recorder); err != nil {
glog.Warningf("Failed to clean ToBeDeleted information: %v", err) glog.Warningf("Failed to clean ToBeDeleted information: %v", err)
continue continue
} }
@ -227,7 +239,7 @@ func run(_ <-chan struct{}) {
// the newest node became available for the scheduler. // the newest node became available for the scheduler.
allNodesAvailableTime := GetAllNodesAvailableTime(nodes) allNodesAvailableTime := GetAllNodesAvailableTime(nodes)
podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime) podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime)
ResetPodScheduledCondition(kubeClient, podsToReset) ResetPodScheduledCondition(autoscalingContext.ClientSet, podsToReset)
// We need to check whether pods marked as unschedulable are actually unschedulable. // We need to check whether pods marked as unschedulable are actually unschedulable.
// This should prevent from adding unnecessary nodes. Example of such situation: // This should prevent from adding unnecessary nodes. Example of such situation:
@ -245,7 +257,8 @@ func run(_ <-chan struct{}) {
// in the describe situation. // in the describe situation.
schedulablePodsPresent := false schedulablePodsPresent := false
if *verifyUnschedulablePods { if *verifyUnschedulablePods {
newUnschedulablePodsToHelp := FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled, predicateChecker) newUnschedulablePodsToHelp := FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled,
autoscalingContext.PredicateChecker)
if len(newUnschedulablePodsToHelp) != len(unschedulablePodsToHelp) { if len(newUnschedulablePodsToHelp) != len(unschedulablePodsToHelp) {
glog.V(2).Info("Schedulable pods present") glog.V(2).Info("Schedulable pods present")
@ -261,9 +274,7 @@ func run(_ <-chan struct{}) {
} else { } else {
scaleUpStart := time.Now() scaleUpStart := time.Now()
updateLastTime("scaleup") updateLastTime("scaleup")
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, cloudProvider, kubeClient, predicateChecker, recorder, scaledUp, err := ScaleUp(autoscalingContext, unschedulablePodsToHelp, nodes)
*maxNodesTotal, *estimatorFlag)
updateDuration("scaleup", scaleUpStart) updateDuration("scaleup", scaleUpStart)
if err != nil { if err != nil {
@ -295,11 +306,10 @@ func run(_ <-chan struct{}) {
usageTracker.CleanUp(time.Now().Add(-(*scaleDownUnneededTime))) usageTracker.CleanUp(time.Now().Add(-(*scaleDownUnneededTime)))
unneededNodes, podLocationHints, nodeUtilizationMap = FindUnneededNodes( unneededNodes, podLocationHints, nodeUtilizationMap = FindUnneededNodes(
autoscalingContext,
nodes, nodes,
unneededNodes, unneededNodes,
*scaleDownUtilizationThreshold,
allScheduled, allScheduled,
predicateChecker,
podLocationHints, podLocationHints,
usageTracker, time.Now()) usageTracker, time.Now())
@ -318,18 +328,13 @@ func run(_ <-chan struct{}) {
updateLastTime("scaledown") updateLastTime("scaledown")
result, err := ScaleDown( result, err := ScaleDown(
autoscalingContext,
nodes, nodes,
nodeUtilizationMap, nodeUtilizationMap,
unneededNodes, unneededNodes,
*scaleDownUnneededTime,
allScheduled, allScheduled,
cloudProvider,
kubeClient,
predicateChecker,
podLocationHints, podLocationHints,
usageTracker, usageTracker)
recorder,
*maxEmptyBulkDeleteFlag)
updateDuration("scaledown", scaleDownStart) updateDuration("scaledown", scaleDownStart)

View File

@ -51,19 +51,16 @@ const (
const ( const (
// ToBeDeletedTaint is a taint used to make the node unschedulable. // ToBeDeletedTaint is a taint used to make the node unschedulable.
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
// MaxGracefulTerminationTime is max gracefull termination time used by CA.
MaxGracefulTerminationTime = time.Minute
) )
// FindUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else, // FindUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else,
// and updates unneededNodes map accordingly. It also returns information where pods can be rescheduld and // and updates unneededNodes map accordingly. It also returns information where pods can be rescheduld and
// node utilization level. // node utilization level.
func FindUnneededNodes(nodes []*apiv1.Node, func FindUnneededNodes(
context AutoscalingContext,
nodes []*apiv1.Node,
unneededNodes map[string]time.Time, unneededNodes map[string]time.Time,
utilizationThreshold float64,
pods []*apiv1.Pod, pods []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker,
oldHints map[string]string, oldHints map[string]string,
tracker *simulator.UsageTracker, tracker *simulator.UsageTracker,
timestamp time.Time) (unnededTimeMap map[string]time.Time, podReschedulingHints map[string]string, utilizationMap map[string]float64) { timestamp time.Time) (unnededTimeMap map[string]time.Time, podReschedulingHints map[string]string, utilizationMap map[string]float64) {
@ -87,7 +84,7 @@ func FindUnneededNodes(nodes []*apiv1.Node,
glog.V(4).Infof("Node %s - utilization %f", node.Name, utilization) glog.V(4).Infof("Node %s - utilization %f", node.Name, utilization)
utilizationMap[node.Name] = utilization utilizationMap[node.Name] = utilization
if utilization >= utilizationThreshold { if utilization >= context.ScaleDownUtilizationThreshold {
glog.V(4).Infof("Node %s is not suitable for removal - utilization too big (%f)", node.Name, utilization) glog.V(4).Infof("Node %s is not suitable for removal - utilization too big (%f)", node.Name, utilization)
continue continue
} }
@ -96,7 +93,7 @@ func FindUnneededNodes(nodes []*apiv1.Node,
// Phase2 - check which nodes can be probably removed using fast drain. // Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods, nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, predicateChecker, nil, context.PredicateChecker,
len(currentlyUnneededNodes), true, oldHints, tracker, timestamp) len(currentlyUnneededNodes), true, oldHints, tracker, timestamp)
if err != nil { if err != nil {
glog.Errorf("Error while simulating node drains: %v", err) glog.Errorf("Error while simulating node drains: %v", err)
@ -120,18 +117,14 @@ func FindUnneededNodes(nodes []*apiv1.Node,
// ScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was // ScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occured. // removed and error if such occured.
func ScaleDown( func ScaleDown(
context AutoscalingContext,
nodes []*apiv1.Node, nodes []*apiv1.Node,
lastUtilizationMap map[string]float64, lastUtilizationMap map[string]float64,
unneededNodes map[string]time.Time, unneededNodes map[string]time.Time,
unneededTime time.Duration,
pods []*apiv1.Pod, pods []*apiv1.Pod,
cloudProvider cloudprovider.CloudProvider,
client kube_client.Interface,
predicateChecker *simulator.PredicateChecker,
oldHints map[string]string, oldHints map[string]string,
usageTracker *simulator.UsageTracker, usageTracker *simulator.UsageTracker,
recorder kube_record.EventRecorder, ) (ScaleDownResult, error) {
maxEmptyBulkDelete int) (ScaleDownResult, error) {
now := time.Now() now := time.Now()
candidates := make([]*apiv1.Node, 0) candidates := make([]*apiv1.Node, 0)
@ -141,11 +134,11 @@ func ScaleDown(
glog.V(2).Infof("%s was unneeded for %s", node.Name, now.Sub(val).String()) glog.V(2).Infof("%s was unneeded for %s", node.Name, now.Sub(val).String())
// Check how long the node was underutilized. // Check how long the node was underutilized.
if !val.Add(unneededTime).Before(now) { if !val.Add(context.ScaleDownUnneededTime).Before(now) {
continue continue
} }
nodeGroup, err := cloudProvider.NodeGroupForNode(node) nodeGroup, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil { if err != nil {
glog.Errorf("Error while checking node group for %s: %v", node.Name, err) glog.Errorf("Error while checking node group for %s: %v", node.Name, err)
continue continue
@ -177,14 +170,14 @@ func ScaleDown(
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
// try to delete not-so-empty nodes, possibly killing some pods and allowing them // try to delete not-so-empty nodes, possibly killing some pods and allowing them
// to recreate on other nodes. // to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, maxEmptyBulkDelete, cloudProvider) emptyNodes := getEmptyNodes(candidates, pods, context.MaxEmptyBulkDelete, context.CloudProvider)
if len(emptyNodes) > 0 { if len(emptyNodes) > 0 {
confirmation := make(chan error, len(emptyNodes)) confirmation := make(chan error, len(emptyNodes))
for _, node := range emptyNodes { for _, node := range emptyNodes {
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name) glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes) simulator.RemoveNodeFromTracker(usageTracker, node.Name, unneededNodes)
go func(nodeToDelete *apiv1.Node) { go func(nodeToDelete *apiv1.Node) {
confirmation <- deleteNodeFromCloudProvider(nodeToDelete, cloudProvider, recorder) confirmation <- deleteNodeFromCloudProvider(nodeToDelete, context.CloudProvider, context.Recorder)
}(node) }(node)
} }
var finalError error var finalError error
@ -201,7 +194,8 @@ func ScaleDown(
} }
// We look for only 1 node so new hints may be incomplete. // We look for only 1 node so new hints may be incomplete.
nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false, nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, context.ClientSet,
context.PredicateChecker, 1, false,
oldHints, usageTracker, time.Now()) oldHints, usageTracker, time.Now())
if err != nil { if err != nil {
@ -222,8 +216,7 @@ func ScaleDown(
// Nothing super-bad should happen if the node is removed from tracker prematurely. // Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes) simulator.RemoveNodeFromTracker(usageTracker, toRemove.Node.Name, unneededNodes)
err = deleteNode(context, toRemove.Node, toRemove.PodsToReschedule)
err = deleteNode(toRemove.Node, toRemove.PodsToReschedule, client, cloudProvider, recorder)
if err != nil { if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err) return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
} }
@ -273,25 +266,26 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit] return result[:limit]
} }
func deleteNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, cloudProvider cloudprovider.CloudProvider, recorder kube_record.EventRecorder) error { func deleteNode(context AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error {
if err := drainNode(node, pods, client, recorder); err != nil { if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec); err != nil {
return err return err
} }
return deleteNodeFromCloudProvider(node, cloudProvider, recorder) return deleteNodeFromCloudProvider(node, context.CloudProvider, context.Recorder)
} }
// Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving // Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
// them up to MaxGracefulTerminationTime to finish. // them up to MaxGracefulTerminationTime to finish.
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder) error { func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGratefulTerminationSec int) error {
if err := markToBeDeleted(node, client, recorder); err != nil { if err := markToBeDeleted(node, client, recorder); err != nil {
return err return err
} }
seconds := int64(MaxGracefulTerminationTime.Seconds()) maxGraceful64 := int64(maxGratefulTerminationSec)
for _, pod := range pods { for _, pod := range pods {
recorder.Eventf(pod, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down") recorder.Eventf(pod, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")
err := client.Core().Pods(pod.Namespace).Delete(pod.Name, &apiv1.DeleteOptions{ err := client.Core().Pods(pod.Namespace).Delete(pod.Name, &apiv1.DeleteOptions{
GracePeriodSeconds: &seconds, GracePeriodSeconds: &maxGraceful64,
}) })
if err != nil { if err != nil {
glog.Errorf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err) glog.Errorf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err)
@ -300,7 +294,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
allGone := true allGone := true
// Wait up to MaxGracefulTerminationTime. // Wait up to MaxGracefulTerminationTime.
for start := time.Now(); time.Now().Sub(start) < MaxGracefulTerminationTime; time.Sleep(5 * time.Second) { for start := time.Now(); time.Now().Sub(start) < time.Duration(maxGratefulTerminationSec)*time.Second; time.Sleep(5 * time.Second) {
allGone = true allGone = true
for _, pod := range pods { for _, pod := range pods {
podreturned, err := client.Core().Pods(pod.Namespace).Get(pod.Name) podreturned, err := client.Core().Pods(pod.Namespace).Get(pod.Name)

View File

@ -59,8 +59,13 @@ func TestFindUnneededNodes(t *testing.T) {
n3 := BuildTestNode("n3", 1000, 10) n3 := BuildTestNode("n3", 1000, 10)
n4 := BuildTestNode("n4", 10000, 10) n4 := BuildTestNode("n4", 10000, 10)
result, hints, utilization := FindUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, map[string]time.Time{}, 0.35, context := AutoscalingContext{
[]*apiv1.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), make(map[string]string), PredicateChecker: simulator.NewTestPredicateChecker(),
ScaleDownUtilizationThreshold: 0.35,
}
result, hints, utilization := FindUnneededNodes(context, []*apiv1.Node{n1, n2, n3, n4}, map[string]time.Time{},
[]*apiv1.Pod{p1, p2, p3, p4}, make(map[string]string),
simulator.NewUsageTracker(), time.Now()) simulator.NewUsageTracker(), time.Now())
assert.Equal(t, 1, len(result)) assert.Equal(t, 1, len(result))
@ -70,8 +75,8 @@ func TestFindUnneededNodes(t *testing.T) {
assert.Equal(t, 4, len(utilization)) assert.Equal(t, 4, len(utilization))
result["n1"] = time.Now() result["n1"] = time.Now()
result2, hints, utilization := FindUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, result, 0.35, result2, hints, utilization := FindUnneededNodes(context, []*apiv1.Node{n1, n2, n3, n4}, result,
[]*apiv1.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), hints, []*apiv1.Pod{p1, p2, p3, p4}, hints,
simulator.NewUsageTracker(), time.Now()) simulator.NewUsageTracker(), time.Now())
assert.Equal(t, 1, len(result2)) assert.Equal(t, 1, len(result2))
@ -110,7 +115,7 @@ func TestDrainNode(t *testing.T) {
updatedNodes <- obj.Name updatedNodes <- obj.Name
return true, obj, nil return true, obj, nil
}) })
err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, createEventRecorder(fakeClient)) err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, createEventRecorder(fakeClient), 20)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, p1.Name, getStringFromChan(deletedPods)) assert.Equal(t, p1.Name, getStringFromChan(deletedPods))
assert.Equal(t, p2.Name, getStringFromChan(deletedPods)) assert.Equal(t, p2.Name, getStringFromChan(deletedPods))

View File

@ -21,10 +21,7 @@ import (
"k8s.io/contrib/cluster-autoscaler/cloudprovider" "k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/estimator" "k8s.io/contrib/cluster-autoscaler/estimator"
"k8s.io/contrib/cluster-autoscaler/simulator"
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
kube_record "k8s.io/kubernetes/pkg/client/record"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -40,9 +37,7 @@ type ExpansionOption struct {
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occured. Assumes that all nodes in the cluster are // false if it didn't and error if an error occured. Assumes that all nodes in the cluster are
// ready and in sync with instance groups. // ready and in sync with instance groups.
func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface, func ScaleUp(context AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node) (bool, error) {
predicateChecker *simulator.PredicateChecker, recorder kube_record.EventRecorder, maxNodesTotal int,
estimatorName string) (bool, error) {
// From now on we only care about unschedulable pods that were marked after the newest // From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler. // node became available for the scheduler.
@ -56,13 +51,13 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
} }
expansionOptions := make([]ExpansionOption, 0) expansionOptions := make([]ExpansionOption, 0)
nodeInfos, err := GetNodeInfosForGroups(nodes, cloudProvider, kubeClient) nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to build node infos for node groups: %v", err) return false, fmt.Errorf("failed to build node infos for node groups: %v", err)
} }
podsRemainUnshedulable := make(map[*apiv1.Pod]struct{}) podsRemainUnshedulable := make(map[*apiv1.Pod]struct{})
for _, nodeGroup := range cloudProvider.NodeGroups() { for _, nodeGroup := range context.CloudProvider.NodeGroups() {
currentSize, err := nodeGroup.TargetSize() currentSize, err := nodeGroup.TargetSize()
if err != nil { if err != nil {
@ -87,7 +82,7 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
} }
for _, pod := range unschedulablePods { for _, pod := range unschedulablePods {
err = predicateChecker.CheckPredicates(pod, nodeInfo) err = context.PredicateChecker.CheckPredicates(pod, nodeInfo)
if err == nil { if err == nil {
option.pods = append(option.pods, pod) option.pods = append(option.pods, pod)
} else { } else {
@ -96,17 +91,17 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
} }
} }
if len(option.pods) > 0 { if len(option.pods) > 0 {
if estimatorName == BinpackingEstimatorName { if context.EstimatorName == BinpackingEstimatorName {
binpackingEstimator := estimator.NewBinpackingNodeEstimator(predicateChecker) binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker)
option.nodeCount = binpackingEstimator.Estimate(option.pods, nodeInfo) option.nodeCount = binpackingEstimator.Estimate(option.pods, nodeInfo)
} else if estimatorName == BasicEstimatorName { } else if context.EstimatorName == BasicEstimatorName {
basicEstimator := estimator.NewBasicNodeEstimator() basicEstimator := estimator.NewBasicNodeEstimator()
for _, pod := range option.pods { for _, pod := range option.pods {
basicEstimator.Add(pod) basicEstimator.Add(pod)
} }
option.nodeCount, option.debug = basicEstimator.Estimate(nodeInfo.Node()) option.nodeCount, option.debug = basicEstimator.Estimate(nodeInfo.Node())
} else { } else {
glog.Fatalf("Unrecognized estimator: %s", estimatorName) glog.Fatalf("Unrecognized estimator: %s", context.EstimatorName)
} }
expansionOptions = append(expansionOptions, option) expansionOptions = append(expansionOptions, option)
} }
@ -131,9 +126,9 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
newSize = bestOption.nodeGroup.MaxSize() newSize = bestOption.nodeGroup.MaxSize()
} }
if maxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > maxNodesTotal { if context.MaxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > context.MaxNodesTotal {
glog.V(1).Infof("Capping size to max cluster total size (%d)", maxNodesTotal) glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newSize = maxNodesTotal - len(nodes) + currentSize newSize = context.MaxNodesTotal - len(nodes) + currentSize
if newSize < currentSize { if newSize < currentSize {
return false, fmt.Errorf("max node total count already reached") return false, fmt.Errorf("max node total count already reached")
} }
@ -146,14 +141,14 @@ func ScaleUp(unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, cloudProvider
} }
for _, pod := range bestOption.pods { for _, pod := range bestOption.pods {
recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp", context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.nodeGroup.Id(), currentSize, newSize) "pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.nodeGroup.Id(), currentSize, newSize)
} }
return true, nil return true, nil
} }
for pod := range podsRemainUnshedulable { for pod := range podsRemainUnshedulable {
recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp", context.Recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp",
"pod didn't trigger scale-up (it wouldn't fit if a new node is added)") "pod didn't trigger scale-up (it wouldn't fit if a new node is added)")
} }

View File

@ -28,11 +28,40 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
kube_record "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog" "github.com/golang/glog"
) )
// AutoscalingContext contains user-configurable constant and configuration-related objects passed to
// scale up/scale down functions.
type AutoscalingContext struct {
// CloudProvider used in CA.
CloudProvider cloudprovider.CloudProvider
// ClientSet interface.
ClientSet kube_client.Interface
// Recorder for fecording events.
Recorder kube_record.EventRecorder
// PredicateChecker to check if a pod can fit into a node.
PredicateChecker *simulator.PredicateChecker
// MaxEmptyBulkDelete is a number of empty nodes that can be removed at the same time.
MaxEmptyBulkDelete int
// ScaleDownUtilizationThreshold sets threshould for nodes to be considered for scale down.
// Well-utilized nodes are not touched.
ScaleDownUtilizationThreshold float64
// ScaleDownUnneededTime sets the duriation CA exepects a node to be unneded/eligible for removal
// before scaling down the node.
ScaleDownUnneededTime time.Duration
// MaxNodesTotal sets the maximum number of nodes in the whole cluster
MaxNodesTotal int
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
EstimatorName string
// MaxGratefulTerminationSec is maximum number of seconds scale down waits for pods to terminante before
// removing the node from cloud provider.
MaxGratefulTerminationSec int
}
// GetAllNodesAvailableTime returns time when the newest node became available for scheduler. // GetAllNodesAvailableTime returns time when the newest node became available for scheduler.
// TODO: This function should use LastTransitionTime from NodeReady condition. // TODO: This function should use LastTransitionTime from NodeReady condition.
func GetAllNodesAvailableTime(nodes []*apiv1.Node) time.Time { func GetAllNodesAvailableTime(nodes []*apiv1.Node) time.Time {