implement blocking scale up beyond max cores & memory

This commit is contained in:
Aleksandra Malinowska 2017-09-01 10:19:41 +02:00
parent f9200da0ba
commit d43029c180
5 changed files with 357 additions and 40 deletions

View File

@ -67,6 +67,14 @@ type AutoscalingOptions struct {
ScaleDownUnreadyTime time.Duration ScaleDownUnreadyTime time.Duration
// MaxNodesTotal sets the maximum number of nodes in the whole cluster // MaxNodesTotal sets the maximum number of nodes in the whole cluster
MaxNodesTotal int MaxNodesTotal int
// MaxCoresTotal sets the maximum number of cores in the whole cluster
MaxCoresTotal int64
// MinCoresTotal sets the minimum number of cores in the whole cluster
MinCoresTotal int64
// MaxMemoryTotal sets the maximum memory (in megabytes) in the whole cluster
MaxMemoryTotal int64
// MinMemoryTotal sets the maximum memory (in megabytes) in the whole cluster
MinMemoryTotal int64
// NodeGroupAutoDiscovery represents one or more definition(s) of node group auto-discovery // NodeGroupAutoDiscovery represents one or more definition(s) of node group auto-discovery
NodeGroupAutoDiscovery string NodeGroupAutoDiscovery string
// UnregisteredNodeRemovalTime represents how long CA waits before removing nodes that are not registered in Kubernetes") // UnregisteredNodeRemovalTime represents how long CA waits before removing nodes that are not registered in Kubernetes")

View File

@ -18,6 +18,8 @@ package core
import ( import (
"bytes" "bytes"
"fmt"
"math"
"time" "time"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
@ -59,6 +61,11 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
return false, err.AddPrefix("failed to build node infos for node groups: ") return false, err.AddPrefix("failed to build node infos for node groups: ")
} }
nodeGroups := context.CloudProvider.NodeGroups()
// calculate current cores & gigabytes of memory
coresTotal, memoryTotal := calculateCPUAndMemory(nodeGroups, nodeInfos)
upcomingNodes := make([]*schedulercache.NodeInfo, 0) upcomingNodes := make([]*schedulercache.NodeInfo, 0)
for nodeGroup, numberOfNodes := range context.ClusterStateRegistry.GetUpcomingNodes() { for nodeGroup, numberOfNodes := range context.ClusterStateRegistry.GetUpcomingNodes() {
nodeTemplate, found := nodeInfos[nodeGroup] nodeTemplate, found := nodeInfos[nodeGroup]
@ -78,8 +85,6 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
podsRemainUnschedulable := make(map[*apiv1.Pod]bool) podsRemainUnschedulable := make(map[*apiv1.Pod]bool)
expansionOptions := make([]expander.Option, 0) expansionOptions := make([]expander.Option, 0)
nodeGroups := context.CloudProvider.NodeGroups()
if context.AutoscalingOptions.NodeAutoprovisioningEnabled { if context.AutoscalingOptions.NodeAutoprovisioningEnabled {
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, unschedulablePods) nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, unschedulablePods)
} }
@ -90,28 +95,43 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
continue continue
} }
currentSize, err := nodeGroup.TargetSize() currentTargetSize, err := nodeGroup.TargetSize()
if err != nil { if err != nil {
glog.Errorf("Failed to get node group size: %v", err) glog.Errorf("Failed to get node group size: %v", err)
continue continue
} }
if currentSize >= nodeGroup.MaxSize() { if currentTargetSize >= nodeGroup.MaxSize() {
// skip this node group. // skip this node group.
glog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id()) glog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
continue continue
} }
option := expander.Option{
NodeGroup: nodeGroup,
Pods: make([]*apiv1.Pod, 0),
}
nodeInfo, found := nodeInfos[nodeGroup.Id()] nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found { if !found {
glog.Errorf("No node info for: %s", nodeGroup.Id()) glog.Errorf("No node info for: %s", nodeGroup.Id())
continue continue
} }
nodeCPU, nodeMemory, err := getNodeCPUAndMemory(nodeInfo)
if err != nil {
glog.Errorf("Failed to get node resources: %v", err)
}
if nodeCPU > (context.MaxCoresTotal - coresTotal) {
// skip this node group
glog.V(4).Infof("Skipping node group %s - not enough cores limit left", nodeGroup.Id())
continue
}
if nodeMemory > (context.MaxMemoryTotal - memoryTotal) {
// skip this node group
glog.V(4).Infof("Skipping node group %s - not enough memory limit left", nodeGroup.Id())
continue
}
option := expander.Option{
NodeGroup: nodeGroup,
Pods: make([]*apiv1.Pod, 0),
}
for _, pod := range unschedulablePods { for _, pod := range unschedulablePods {
err = context.PredicateChecker.CheckPredicates(pod, nil, nodeInfo, simulator.ReturnVerboseError) err = context.PredicateChecker.CheckPredicates(pod, nil, nodeInfo, simulator.ReturnVerboseError)
if err == nil { if err == nil {
@ -172,6 +192,7 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
glog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id()) glog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id())
newNodes := bestOption.NodeCount newNodes := bestOption.NodeCount
if context.MaxNodesTotal > 0 && len(nodes)+newNodes > context.MaxNodesTotal { if context.MaxNodesTotal > 0 && len(nodes)+newNodes > context.MaxNodesTotal {
glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newNodes = context.MaxNodesTotal - len(nodes) newNodes = context.MaxNodesTotal - len(nodes)
@ -190,6 +211,22 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
} }
} }
nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()]
if !found {
// This should never happen, as we already should have retrieved
// nodeInfo for any considered nodegroup.
glog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return false, errors.NewAutoscalerError(
errors.CloudProviderError,
"No node info for best expansion option!")
}
// apply upper limits for CPU and memory
newNodes, err = applyCPUAndMemoryLimit(newNodes, coresTotal, memoryTotal, context.MaxCoresTotal, context.MaxMemoryTotal, nodeInfo)
if err != nil {
return false, err
}
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup} targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if context.BalanceSimilarNodeGroups { if context.BalanceSimilarNodeGroups {
similarNodeGroups, typedErr := nodegroupset.FindSimilarNodeGroups(bestOption.NodeGroup, context.CloudProvider, nodeInfos) similarNodeGroups, typedErr := nodegroupset.FindSimilarNodeGroups(bestOption.NodeGroup, context.CloudProvider, nodeInfos)
@ -332,3 +369,96 @@ func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []clou
} }
return nodeGroups, nodeInfos return nodeGroups, nodeInfos
} }
func calculateCPUAndMemory(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range nodeGroups {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
glog.Errorf("Failed to get node group size of %v: %v", nodeGroup.Id(), err)
continue
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
glog.Errorf("No node info for: %s", nodeGroup.Id())
continue
}
if currentSize > 0 {
nodeCPU, nodeMemory, err := getNodeCPUAndMemory(nodeInfo)
if err != nil {
glog.Errorf("Failed to get node resources: %v", err)
continue
}
coresTotal = coresTotal + int64(currentSize)*nodeCPU
memoryTotal = memoryTotal + int64(currentSize)*nodeMemory
}
}
return coresTotal, memoryTotal
}
func applyCPUAndMemoryLimit(newNodes int, coresTotal, memoryTotal, maxCoresTotal, maxMemoryTotal int64, nodeInfo *schedulercache.NodeInfo) (int, errors.AutoscalerError) {
newNodeCPU, newNodeMemory, err := getNodeCPUAndMemory(nodeInfo)
if err != nil {
// This is not very elegant, but it allows us to proceed even if we're
// unable to compute cpu/memory limits (not breaking current functionality)
glog.Errorf("Failed to get node resources: %v", err)
return newNodes, nil
}
if coresTotal+newNodeCPU*int64(newNodes) > maxCoresTotal {
glog.V(1).Infof("Capping size to max cluster cores (%d)", maxCoresTotal)
newNodes = int((maxCoresTotal - coresTotal) / newNodeCPU)
if newNodes < 1 {
// This should never happen, as we already check that
// at least one node will fit when considering nodegroup
return 0, errors.NewAutoscalerError(
errors.TransientError,
"max cores already reached")
}
}
if memoryTotal+newNodeMemory*int64(newNodes) > maxMemoryTotal {
glog.V(1).Infof("Capping size to max cluster memory allowed (%d)", maxMemoryTotal)
newNodes = int((maxMemoryTotal - memoryTotal) / newNodeMemory)
if newNodes < 1 {
// This should never happen, as we already check that
// at least one node will fit when considering nodegroup
return 0, errors.NewAutoscalerError(
errors.TransientError,
"max memory already reached")
}
}
return newNodes, nil
}
const (
// Megabyte is 2^20 bytes.
Megabyte float64 = 1024 * 1024
)
func getNodeCPUAndMemory(nodeInfo *schedulercache.NodeInfo) (int64, int64, error) {
nodeCPU, err := getNodeResource(nodeInfo, apiv1.ResourceCPU)
if err != nil {
return 0, 0, err
}
nodeMemory, err := getNodeResource(nodeInfo, apiv1.ResourceMemory)
if err != nil {
return 0, 0, err
}
if nodeCPU <= 0 || nodeMemory <= 0 {
return 0, 0, fmt.Errorf("Invalid node CPU/memory values - cpu %v, memory %v", nodeCPU, nodeMemory)
}
nodeMemoryMb := math.Ceil(float64(nodeMemory) / Megabyte)
return nodeCPU, int64(nodeMemoryMb), nil
}
func getNodeResource(nodeInfo *schedulercache.NodeInfo, resource apiv1.ResourceName) (int64, error) {
nodeCapacity, found := nodeInfo.Node().Status.Capacity[resource]
if !found {
return 0, fmt.Errorf("Failed to get %v for node %v", resource, nodeInfo.Node().Name)
}
return nodeCapacity.Value(), nil
}

View File

@ -44,28 +44,135 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
type nodeConfig struct {
name string
cpu int64
memory int64
ready bool
group string
}
type podConfig struct {
name string
cpu int64
memory int64
node string
}
type scaleUpConfig struct {
nodes []nodeConfig
pods []podConfig
extraPods []podConfig
expectedScaleUp string
expectedScaleUpGroup string
options AutoscalingOptions
}
var defaultOptions = AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
}
func TestScaleUpOK(t *testing.T) { func TestScaleUpOK(t *testing.T) {
config := &scaleUpConfig{
nodes: []nodeConfig{
{"n1", 100, 100, true, "ng1"},
{"n2", 1000, 1000, true, "ng2"},
},
pods: []podConfig{
{"p1", 80, 0, "n1"},
{"p2", 800, 0, "n2"},
},
extraPods: []podConfig{
{"p-new", 500, 0, ""},
},
expectedScaleUp: "ng2-1",
expectedScaleUpGroup: "ng2",
options: defaultOptions,
}
simpleScaleUpTest(t, config)
}
func TestScaleUpMaxCoresLimitHit(t *testing.T) {
options := defaultOptions
options.MaxCoresTotal = 9
config := &scaleUpConfig{
nodes: []nodeConfig{
{"n1", 2000, 100, true, "ng1"},
{"n2", 4000, 1000, true, "ng2"},
},
pods: []podConfig{
{"p1", 1000, 0, "n1"},
{"p2", 3000, 0, "n2"},
},
extraPods: []podConfig{
{"p-new-1", 2000, 0, ""},
{"p-new-2", 2000, 0, ""},
},
expectedScaleUp: "ng1-1",
expectedScaleUpGroup: "ng1",
options: options,
}
simpleScaleUpTest(t, config)
}
const MB = 1024 * 1024
func TestScaleUpMaxMemoryLimitHit(t *testing.T) {
options := defaultOptions
options.MaxMemoryTotal = 1300 // set in mb
config := &scaleUpConfig{
nodes: []nodeConfig{
{"n1", 2000, 100 * MB, true, "ng1"},
{"n2", 4000, 1000 * MB, true, "ng2"},
},
pods: []podConfig{
{"p1", 1000, 0, "n1"},
{"p2", 3000, 0, "n2"},
},
extraPods: []podConfig{
{"p-new-1", 2000, 100 * MB, ""},
{"p-new-2", 2000, 100 * MB, ""},
{"p-new-3", 2000, 100 * MB, ""},
},
expectedScaleUp: "ng1-2",
expectedScaleUpGroup: "ng1",
options: options,
}
simpleScaleUpTest(t, config)
}
func simpleScaleUpTest(t *testing.T, config *scaleUpConfig) {
expandedGroups := make(chan string, 10) expandedGroups := make(chan string, 10)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
n1 := BuildTestNode("n1", 100, 1000) groups := make(map[string][]*apiv1.Node)
SetNodeReadyState(n1, true, time.Now()) nodes := make([]*apiv1.Node, len(config.nodes))
n2 := BuildTestNode("n2", 1000, 1000) for i, n := range config.nodes {
SetNodeReadyState(n2, true, time.Now()) node := BuildTestNode(n.name, n.cpu, n.memory)
SetNodeReadyState(node, n.ready, time.Now())
nodes[i] = node
groups[n.group] = append(groups[n.group], node)
}
p1 := BuildTestPod("p1", 80, 0) pods := make(map[string][]apiv1.Pod)
p2 := BuildTestPod("p2", 800, 0) for _, p := range config.pods {
p1.Spec.NodeName = "n1" pod := *BuildTestPod(p.name, p.cpu, p.memory)
p2.Spec.NodeName = "n2" pod.Spec.NodeName = p.node
pods[p.node] = append(pods[p.node], pod)
}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction) list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String() fieldstring := list.GetListRestrictions().Fields.String()
if strings.Contains(fieldstring, "n1") { for _, node := range nodes {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1}}, nil if strings.Contains(fieldstring, node.Name) {
return true, &apiv1.PodList{Items: pods[node.Name]}, nil
} }
if strings.Contains(fieldstring, "n2") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p2}}, nil
} }
return true, nil, fmt.Errorf("Failed to list: %v", list) return true, nil, fmt.Errorf("Failed to list: %v", list)
}) })
@ -74,21 +181,23 @@ func TestScaleUpOK(t *testing.T) {
expandedGroups <- fmt.Sprintf("%s-%d", nodeGroup, increase) expandedGroups <- fmt.Sprintf("%s-%d", nodeGroup, increase)
return nil return nil
}, nil) }, nil)
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNodeGroup("ng2", 1, 10, 1) for name, nodesInGroup := range groups {
provider.AddNode("ng1", n1) provider.AddNodeGroup(name, 1, 10, len(nodesInGroup))
provider.AddNode("ng2", n2) for _, n := range nodesInGroup {
provider.AddNode(name, n)
}
}
assert.NotNil(t, provider) assert.NotNil(t, provider)
fakeRecorder := kube_record.NewFakeRecorder(5) fakeRecorder := kube_record.NewFakeRecorder(5)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false) fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
clusterState.UpdateNodes(nodes, time.Now())
context := &AutoscalingContext{ context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{ AutoscalingOptions: config.options,
EstimatorName: estimator.BinpackingEstimatorName,
},
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider, CloudProvider: provider,
ClientSet: fakeClient, ClientSet: fakeClient,
@ -97,17 +206,24 @@ func TestScaleUpOK(t *testing.T) {
ClusterStateRegistry: clusterState, ClusterStateRegistry: clusterState,
LogRecorder: fakeLogRecorder, LogRecorder: fakeLogRecorder,
} }
p3 := BuildTestPod("p-new", 500, 0)
result, err := ScaleUp(context, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) extraPods := make([]*apiv1.Pod, len(config.extraPods))
for i, p := range config.extraPods {
pod := BuildTestPod(p.name, p.cpu, p.memory)
extraPods[i] = pod
}
result, err := ScaleUp(context, extraPods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, result) assert.True(t, result)
assert.Equal(t, "ng2-1", getStringFromChan(expandedGroups))
assert.Equal(t, config.expectedScaleUp, getStringFromChan(expandedGroups))
nodeEventSeen := false nodeEventSeen := false
for eventsLeft := true; eventsLeft; { for eventsLeft := true; eventsLeft; {
select { select {
case event := <-fakeRecorder.Events: case event := <-fakeRecorder.Events:
if strings.Contains(event, "TriggeredScaleUp") && strings.Contains(event, "ng2") { if strings.Contains(event, "TriggeredScaleUp") && strings.Contains(event, config.expectedScaleUpGroup) {
nodeEventSeen = true nodeEventSeen = true
} }
assert.NotRegexp(t, regexp.MustCompile("NotTriggerScaleUp"), event) assert.NotRegexp(t, regexp.MustCompile("NotTriggerScaleUp"), event)
@ -165,6 +281,8 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
context := &AutoscalingContext{ context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{ AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName, EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
}, },
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider, CloudProvider: provider,
@ -228,9 +346,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{ context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{ AutoscalingOptions: defaultOptions,
EstimatorName: estimator.BinpackingEstimatorName,
},
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider, CloudProvider: provider,
ClientSet: fakeClient, ClientSet: fakeClient,
@ -288,6 +404,8 @@ func TestScaleUpUnhealthy(t *testing.T) {
context := &AutoscalingContext{ context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{ AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName, EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
}, },
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider, CloudProvider: provider,
@ -337,6 +455,8 @@ func TestScaleUpNoHelp(t *testing.T) {
context := &AutoscalingContext{ context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{ AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName, EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
}, },
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider, CloudProvider: provider,
@ -416,6 +536,8 @@ func TestScaleUpBalanceGroups(t *testing.T) {
AutoscalingOptions: AutoscalingOptions{ AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName, EstimatorName: estimator.BinpackingEstimatorName,
BalanceSimilarNodeGroups: true, BalanceSimilarNodeGroups: true,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
}, },
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider, CloudProvider: provider,

View File

@ -18,10 +18,12 @@ package main
import ( import (
"flag" "flag"
"fmt"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"os/signal" "os/signal"
"strconv"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@ -101,6 +103,8 @@ var (
"max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).") "max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", "0:320000", "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", "0:6400000", "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws, kubemark") cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws, kubemark")
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.") maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.")
@ -124,6 +128,18 @@ var (
) )
func createAutoscalerOptions() core.AutoscalerOptions { func createAutoscalerOptions() core.AutoscalerOptions {
minCoresTotal, maxCoresTotal, err := parseMinMaxFlag(*coresTotal)
if err != nil {
glog.Fatalf("Failed to parse flags: %v", err)
}
minMemoryTotal, maxMemoryTotal, err := parseMinMaxFlag(*memoryTotal)
if err != nil {
glog.Fatalf("Failed to parse flags: %v", err)
}
// Convert memory limits to megabytes.
minMemoryTotal = minMemoryTotal * 1024
maxMemoryTotal = maxMemoryTotal * 1024
autoscalingOpts := core.AutoscalingOptions{ autoscalingOpts := core.AutoscalingOptions{
CloudConfig: *cloudConfig, CloudConfig: *cloudConfig,
CloudProviderName: *cloudProviderFlag, CloudProviderName: *cloudProviderFlag,
@ -136,6 +152,10 @@ func createAutoscalerOptions() core.AutoscalerOptions {
MaxGracefulTerminationSec: *maxGracefulTerminationFlag, MaxGracefulTerminationSec: *maxGracefulTerminationFlag,
MaxNodeProvisionTime: *maxNodeProvisionTime, MaxNodeProvisionTime: *maxNodeProvisionTime,
MaxNodesTotal: *maxNodesTotal, MaxNodesTotal: *maxNodesTotal,
MaxCoresTotal: maxCoresTotal,
MinCoresTotal: minCoresTotal,
MaxMemoryTotal: maxMemoryTotal,
MinMemoryTotal: minMemoryTotal,
NodeGroups: nodeGroupsFlag, NodeGroups: nodeGroupsFlag,
UnregisteredNodeRemovalTime: *unregisteredNodeRemovalTime, UnregisteredNodeRemovalTime: *unregisteredNodeRemovalTime,
ScaleDownDelay: *scaleDownDelay, ScaleDownDelay: *scaleDownDelay,
@ -365,3 +385,37 @@ const (
defaultRenewDeadline = 10 * time.Second defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second defaultRetryPeriod = 2 * time.Second
) )
func parseMinMaxFlag(flag string) (int64, int64, error) {
tokens := strings.SplitN(flag, ":", 2)
if len(tokens) != 2 {
return 0, 0, fmt.Errorf("wrong nodes configuration: %s", flag)
}
min, err := strconv.ParseInt(tokens[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to set min size: %s, expected integer, err: %v", tokens[0], err)
}
max, err := strconv.ParseInt(tokens[1], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to set max size: %s, expected integer, err: %v", tokens[1], err)
}
err = validateMinMaxFlag(min, max)
if err != nil {
return 0, 0, err
}
return min, max, nil
}
func validateMinMaxFlag(min, max int64) error {
if min < 0 {
return fmt.Errorf("min size must be greater or equal to 0")
}
if max < min {
return fmt.Errorf("max size must be greater or equal to min size")
}
return nil
}

View File

@ -73,6 +73,9 @@ func IsNodeInfoSimilar(n1, n2 *schedulercache.NodeInfo) bool {
free[res] = append(free[res], freeRes) free[res] = append(free[res], freeRes)
} }
} }
// For capacity we require exact match.
// If this is ever changed, enforcing MaxCoresTotal and MaxMemoryTotal limits
// as it is now may no longer work.
for _, qtyList := range capacity { for _, qtyList := range capacity {
if len(qtyList) != 2 || qtyList[0].Cmp(qtyList[1]) != 0 { if len(qtyList) != 2 || qtyList[0].Cmp(qtyList[1]) != 0 {
return false return false