respect minimum cores/memory limit during scale down

This commit is contained in:
Aleksandra Malinowska 2017-09-11 12:32:58 +02:00
parent 809da5fdbc
commit 197b05b180
8 changed files with 403 additions and 154 deletions

View File

@ -0,0 +1,24 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
const (
// DefaultMaxClusterCores is the default maximum number of cores in the cluster.
DefaultMaxClusterCores = 5000 * 64
// DefaultMaxClusterMemory is the default maximum number of gigabytes of memory in cluster.
DefaultMaxClusterMemory = 5000 * 64 * 20
)

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
@ -212,7 +213,8 @@ func (sd *ScaleDown) UpdateUnneededNodes(
emptyNodes := make(map[string]bool)
emptyNodesList := getEmptyNodes(currentlyUnneededNodes, pods, len(currentlyUnneededNodes), sd.context.CloudProvider)
emptyNodesList := getEmptyNodes(currentlyUnneededNodes, pods, len(currentlyUnneededNodes),
config.DefaultMaxClusterCores, config.DefaultMaxClusterMemory, sd.context.CloudProvider)
for _, node := range emptyNodesList {
emptyNodes[node.Name] = true
}
@ -357,14 +359,19 @@ func (sd *ScaleDown) chooseCandidates(nodes []*apiv1.Node) ([]*apiv1.Node, []*ap
// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (ScaleDownResult, errors.AutoscalerError) {
func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (ScaleDownResult, errors.AutoscalerError) {
nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)
nodesWithoutMaster := filterOutMasters(allNodes, pods)
candidates := make([]*apiv1.Node, 0)
readinessMap := make(map[string]bool)
for _, node := range nodes {
coresTotal, memoryTotal := calculateCoresAndMemoryTotal(nodesWithoutMaster, currentTime)
coresLeft := coresTotal - sd.context.MinCoresTotal
memoryLeft := memoryTotal - sd.context.MinMemoryTotal
for _, node := range nodesWithoutMaster {
if val, found := sd.unneededNodes[node.Name]; found {
glog.V(2).Infof("%s was unneeded for %s", node.Name, currentTime.Sub(val).String())
@ -409,6 +416,19 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
continue
}
nodeCPU, nodeMemory, err := getNodeCoresAndMemory(node)
if err != nil {
glog.Warningf("Error getting node resources: %v", err)
}
if nodeCPU > coresLeft {
glog.V(4).Infof("Skipping %s - not enough cores limit left", node.Name)
continue
}
if nodeMemory > memoryLeft {
glog.V(4).Infof("Skipping %s - not enough memory limit left", node.Name)
continue
}
candidates = append(candidates, node)
}
}
@ -420,7 +440,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
// try to delete not-so-empty nodes, possibly killing some pods and allowing them
// to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, sd.context.CloudProvider)
emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, coresLeft, memoryLeft, sd.context.CloudProvider)
if len(emptyNodes) > 0 {
nodeDeletionStart := time.Now()
confirmation := make(chan errors.AutoscalerError, len(emptyNodes))
@ -435,7 +455,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
findNodesToRemoveStart := time.Now()
// We look for only 1 node so new hints may be incomplete.
nodesToRemove, _, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, sd.context.ClientSet,
nodesToRemove, _, _, err := simulator.FindNodesToRemove(candidates, nodesWithoutMaster, pods, sd.context.ClientSet,
sd.context.PredicateChecker, 1, false,
sd.podLocationHints, sd.usageTracker, time.Now(), pdbs)
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)
@ -496,10 +516,16 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
// This functions finds empty nodes among passed candidates and returns a list of empty nodes
// that can be deleted at the same time.
func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, cloudProvider cloudprovider.CloudProvider) []*apiv1.Node {
func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int,
coresLimit, memoryLimit int64, cloudProvider cloudprovider.CloudProvider) []*apiv1.Node {
emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods)
availabilityMap := make(map[string]int)
result := make([]*apiv1.Node, 0)
coresLeft := coresLimit
memoryLeft := memoryLimit
for _, node := range emptyNodes {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
@ -524,6 +550,19 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
availabilityMap[nodeGroup.Id()] = available
}
if available > 0 {
cores, memory, err := getNodeCoresAndMemory(node)
if err != nil {
glog.Errorf("Error: %v", err)
continue
}
if cores > coresLeft {
continue
}
if memory > memoryLeft {
continue
}
coresLeft = coresLeft - cores
memoryLeft = memoryLeft - memory
available -= 1
availabilityMap[nodeGroup.Id()] = available
result = append(result, node)
@ -778,3 +817,47 @@ func cleanUpNodeAutoprovisionedGroups(cloudProvider cloudprovider.CloudProvider)
}
return nil
}
func calculateCoresAndMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, int64) {
var coresTotal, memoryTotal int64
for _, node := range nodes {
deleteTime, _ := deletetaint.GetToBeDeletedTime(node)
if deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime) {
// Nodes being deleted do not count towards total cluster resources
continue
}
cores, memory, err := getNodeCoresAndMemory(node)
if err != nil {
glog.Errorf("Error getting node resources: %v", err)
continue
}
coresTotal = coresTotal + cores
memoryTotal = memoryTotal + memory
}
return coresTotal, memoryTotal
}
const (
apiServerLabelKey = "component"
apiServerLabelValue = "kube-apiserver"
)
func filterOutMasters(nodes []*apiv1.Node, pods []*apiv1.Pod) []*apiv1.Node {
masters := make(map[string]bool)
for _, pod := range pods {
if pod.Namespace == metav1.NamespaceSystem && pod.Labels[apiServerLabelKey] == apiServerLabelValue {
masters[pod.Spec.NodeName] = true
}
}
// if masters aren't on the list of nodes, capacity will be increased on overflowing append
others := make([]*apiv1.Node, 0, len(nodes)-len(masters))
for _, node := range nodes {
if !masters[node.Name] {
others = append(others, node)
}
}
return others
}

View File

@ -527,7 +527,7 @@ func TestScaleDown(t *testing.T) {
}
func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
for start := time.Now(); time.Now().Sub(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.nodeDeleteStatus.IsDeleteInProgress() {
return
}
@ -535,6 +535,13 @@ func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
t.Fatalf("Node delete not finished")
}
// this IGNORES duplicates
func assertEqualSet(t *testing.T, a []string, b []string) {
assertSubset(t, a, b)
assertSubset(t, b, a)
}
// this IGNORES duplicates
func assertSubset(t *testing.T, a []string, b []string) {
for _, x := range a {
found := false
@ -550,86 +557,84 @@ func assertSubset(t *testing.T, a []string, b []string) {
}
}
var defaultScaleDownOptions = AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 60,
MaxEmptyBulkDelete: 10,
MinCoresTotal: 0,
MinMemoryTotal: 0,
}
func TestScaleDownEmptyMultipleNodeGroups(t *testing.T) {
updatedNodes := make(chan string, 10)
deletedNodes := make(chan string, 10)
fakeClient := &fake.Clientset{}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Time{})
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, &apiv1.PodList{Items: []apiv1.Pod{}}, nil
})
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
})
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
switch getAction.GetName() {
case n1.Name:
return true, n1, nil
case n2.Name:
return true, n2, nil
}
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
updatedNodes <- obj.Name
return true, obj, nil
})
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
deletedNodes <- node
return nil
})
provider.AddNodeGroup("ng1", 0, 10, 2)
provider.AddNodeGroup("ng2", 0, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng2", n2)
assert.NotNil(t, provider)
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 60,
MaxEmptyBulkDelete: 10,
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 1000, 1000, true, "ng1"},
{"n2", 1000, 1000, true, "ng2"},
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
ClientSet: fakeClient,
Recorder: fakeRecorder,
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder),
LogRecorder: fakeLogRecorder,
options: defaultScaleDownOptions,
expectedScaleDowns: []string{"n1", "n2"},
}
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleted, result)
d1 := getStringFromChan(deletedNodes)
d2 := getStringFromChan(deletedNodes)
assertSubset(t, []string{d1, d2}, []string{n1.Name, n2.Name})
simpleScaleDownEmpty(t, config)
}
func TestScaleDownEmptySingleNodeGroup(t *testing.T) {
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 1000, 1000, true, "ng1"},
{"n2", 1000, 1000, true, "ng1"},
},
options: defaultScaleDownOptions,
expectedScaleDowns: []string{"n1", "n2"},
}
simpleScaleDownEmpty(t, config)
}
func TestScaleDownEmptyMinCoresLimitHit(t *testing.T) {
options := defaultScaleDownOptions
options.MinCoresTotal = 2
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 2000, 1000, true, "ng1"},
{"n2", 1000, 1000, true, "ng1"},
},
options: options,
expectedScaleDowns: []string{"n2"},
}
simpleScaleDownEmpty(t, config)
}
func TestScaleDownEmptyMinMemoryLimitHit(t *testing.T) {
options := defaultScaleDownOptions
options.MinMemoryTotal = 1
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 2000, 1000 * MB, true, "ng1"},
{"n2", 1000, 1000 * MB, true, "ng1"},
{"n3", 1000, 1000 * MB, true, "ng1"},
},
options: options,
expectedScaleDowns: []string{"n1", "n2"},
}
simpleScaleDownEmpty(t, config)
}
func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
updatedNodes := make(chan string, 10)
deletedNodes := make(chan string, 10)
fakeClient := &fake.Clientset{}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Time{})
nodes := make([]*apiv1.Node, len(config.nodes))
nodesMap := make(map[string]*apiv1.Node)
groups := make(map[string][]*apiv1.Node)
for i, n := range config.nodes {
node := BuildTestNode(n.name, n.cpu, n.memory)
SetNodeReadyState(node, n.ready, time.Time{})
nodesMap[n.name] = node
nodes[i] = node
groups[n.group] = append(groups[n.group], node)
}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, &apiv1.PodList{Items: []apiv1.Pod{}}, nil
})
@ -638,13 +643,11 @@ func TestScaleDownEmptySingleNodeGroup(t *testing.T) {
})
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
switch getAction.GetName() {
case n1.Name:
return true, n1, nil
case n2.Name:
return true, n2, nil
if node, found := nodesMap[getAction.GetName()]; found {
return true, node, nil
}
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
@ -657,20 +660,20 @@ func TestScaleDownEmptySingleNodeGroup(t *testing.T) {
deletedNodes <- node
return nil
})
provider.AddNodeGroup("ng1", 0, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)
for name, nodesInGroup := range groups {
provider.AddNodeGroup(name, 0, 10, len(nodesInGroup))
for _, n := range nodesInGroup {
provider.AddNode(name, n)
}
}
assert.NotNil(t, provider)
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 60,
MaxEmptyBulkDelete: 10,
},
AutoscalingOptions: config.options,
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
ClientSet: fakeClient,
@ -679,16 +682,34 @@ func TestScaleDownEmptySingleNodeGroup(t *testing.T) {
LogRecorder: fakeLogRecorder,
}
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, nil, time.Now())
scaleDown.UpdateUnneededNodes(nodes,
nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown)
// This helps to verify that TryToScaleDown doesn't attempt to remove anything
// after delete in progress status is gone.
close(deletedNodes)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleted, result)
d1 := getStringFromChan(deletedNodes)
d2 := getStringFromChan(deletedNodes)
assertSubset(t, []string{d1, d2}, []string{n1.Name, n2.Name})
// Check the channel (and make sure there isn't more than there should be).
deleted := make([]string, 0, len(config.expectedScaleDowns)+10)
empty := false
for i := 0; i < len(config.expectedScaleDowns)+10 && !empty; i++ {
select {
case d := <-deletedNodes:
if d == "" { // a closed channel yields empty value
empty = true
} else {
deleted = append(deleted, d)
}
default:
empty = true
}
}
assertEqualSet(t, config.expectedScaleDowns, deleted)
}
func TestNoScaleDownUnready(t *testing.T) {
@ -923,3 +944,77 @@ func TestCleanUpNodeAutoprovisionedGroups(t *testing.T) {
assert.NoError(t, cleanUpNodeAutoprovisionedGroups(provider))
}
func TestCalculateCoresAndMemoryTotal(t *testing.T) {
nodeConfigs := []nodeConfig{
{"n1", 2000, 7500 * MB, true, "ng1"},
{"n2", 2000, 7500 * MB, true, "ng1"},
{"n3", 2000, 7500 * MB, true, "ng1"},
{"n4", 12000, 8000 * MB, true, "ng1"},
{"n5", 16000, 7500 * MB, true, "ng1"},
{"n6", 8000, 6000 * MB, true, "ng1"},
{"n7", 6000, 16000 * MB, true, "ng1"},
}
nodes := make([]*apiv1.Node, len(nodeConfigs))
for i, n := range nodeConfigs {
node := BuildTestNode(n.name, n.cpu, n.memory)
SetNodeReadyState(node, n.ready, time.Now())
nodes[i] = node
}
nodes[6].Spec.Taints = []apiv1.Taint{
{
Key: deletetaint.ToBeDeletedTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectNoSchedule,
},
}
coresTotal, memoryTotal := calculateCoresAndMemoryTotal(nodes, time.Now())
assert.Equal(t, int64(42), coresTotal)
assert.Equal(t, int64(44000), memoryTotal)
}
func TestFilterOutMasters(t *testing.T) {
nodeConfigs := []nodeConfig{
{"n1", 2000, 4000, false, "ng1"},
{"n2", 2000, 4000, true, "ng2"},
{"n3", 2000, 8000, true, ""}, // real master
{"n4", 1000, 2000, true, "ng3"},
{"n5", 1000, 2000, true, "ng3"},
{"n6", 2000, 8000, true, ""}, // same machine type, no node group, no api server
{"n7", 2000, 8000, true, ""}, // real master
}
nodes := make([]*apiv1.Node, len(nodeConfigs))
for i, n := range nodeConfigs {
node := BuildTestNode(n.name, n.cpu, n.memory)
SetNodeReadyState(node, n.ready, time.Now())
nodes[i] = node
}
BuildTestPodWithExtra := func(name, namespace, node string, labels map[string]string) *apiv1.Pod {
pod := BuildTestPod(name, 100, 200)
pod.Spec.NodeName = node
pod.Namespace = namespace
pod.Labels = labels
return pod
}
pods := []*apiv1.Pod{
BuildTestPodWithExtra("kube-apiserver-kubernetes-master", "kube-system", "n2", map[string]string{}), // without label
BuildTestPodWithExtra("kube-apiserver-kubernetes-master", "fake-kube-system", "n6", map[string]string{"component": "kube-apiserver"}), // wrong namespace
BuildTestPodWithExtra("kube-apiserver-kubernetes-master", "kube-system", "n3", map[string]string{"component": "kube-apiserver"}), // real api server
BuildTestPodWithExtra("hidden-name", "kube-system", "n7", map[string]string{"component": "kube-apiserver"}), // also a real api server
BuildTestPodWithExtra("kube-apiserver-kubernetes-master", "kube-system", "n1", map[string]string{"component": "kube-apiserver-dev"}), // wrong label
BuildTestPodWithExtra("custom-deployment", "custom", "n5", map[string]string{"component": "custom-component", "custom-key": "custom-value"}), // unrelated pod
}
withoutMasters := filterOutMasters(nodes, pods)
withoutMastersNames := make([]string, len(withoutMasters))
for i, n := range withoutMasters {
withoutMastersNames[i] = n.Name
}
assertEqualSet(t, []string{"n1", "n2", "n4", "n5", "n6"}, withoutMastersNames)
}

View File

@ -18,8 +18,6 @@ package core
import (
"bytes"
"fmt"
"math"
"time"
apiv1 "k8s.io/api/core/v1"
@ -64,7 +62,7 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
nodeGroups := context.CloudProvider.NodeGroups()
// calculate current cores & gigabytes of memory
coresTotal, memoryTotal := calculateCPUAndMemory(nodeGroups, nodeInfos)
coresTotal, memoryTotal := calculateClusterCoresMemoryTotal(nodeGroups, nodeInfos)
upcomingNodes := make([]*schedulercache.NodeInfo, 0)
for nodeGroup, numberOfNodes := range context.ClusterStateRegistry.GetUpcomingNodes() {
@ -113,7 +111,7 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
continue
}
nodeCPU, nodeMemory, err := getNodeCPUAndMemory(nodeInfo)
nodeCPU, nodeMemory, err := getNodeInfoCoresAndMemory(nodeInfo)
if err != nil {
glog.Errorf("Failed to get node resources: %v", err)
}
@ -223,7 +221,7 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
}
// apply upper limits for CPU and memory
newNodes, err = applyCPUAndMemoryLimit(newNodes, coresTotal, memoryTotal, context.MaxCoresTotal, context.MaxMemoryTotal, nodeInfo)
newNodes, err = applyMaxClusterCoresMemoryLimits(newNodes, coresTotal, memoryTotal, context.MaxCoresTotal, context.MaxMemoryTotal, nodeInfo)
if err != nil {
return false, err
}
@ -371,7 +369,7 @@ func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []clou
return nodeGroups, nodeInfos
}
func calculateCPUAndMemory(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) {
func calculateClusterCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range nodeGroups {
@ -386,7 +384,7 @@ func calculateCPUAndMemory(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[s
continue
}
if currentSize > 0 {
nodeCPU, nodeMemory, err := getNodeCPUAndMemory(nodeInfo)
nodeCPU, nodeMemory, err := getNodeInfoCoresAndMemory(nodeInfo)
if err != nil {
glog.Errorf("Failed to get node resources: %v", err)
continue
@ -399,8 +397,8 @@ func calculateCPUAndMemory(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[s
return coresTotal, memoryTotal
}
func applyCPUAndMemoryLimit(newNodes int, coresTotal, memoryTotal, maxCoresTotal, maxMemoryTotal int64, nodeInfo *schedulercache.NodeInfo) (int, errors.AutoscalerError) {
newNodeCPU, newNodeMemory, err := getNodeCPUAndMemory(nodeInfo)
func applyMaxClusterCoresMemoryLimits(newNodes int, coresTotal, memoryTotal, maxCoresTotal, maxMemoryTotal int64, nodeInfo *schedulercache.NodeInfo) (int, errors.AutoscalerError) {
newNodeCPU, newNodeMemory, err := getNodeInfoCoresAndMemory(nodeInfo)
if err != nil {
// This is not very elegant, but it allows us to proceed even if we're
// unable to compute cpu/memory limits (not breaking current functionality)
@ -432,34 +430,6 @@ func applyCPUAndMemoryLimit(newNodes int, coresTotal, memoryTotal, maxCoresTotal
return newNodes, nil
}
const (
// Megabyte is 2^20 bytes.
Megabyte float64 = 1024 * 1024
)
func getNodeCPUAndMemory(nodeInfo *schedulercache.NodeInfo) (int64, int64, error) {
nodeCPU, err := getNodeResource(nodeInfo, apiv1.ResourceCPU)
if err != nil {
return 0, 0, err
}
nodeMemory, err := getNodeResource(nodeInfo, apiv1.ResourceMemory)
if err != nil {
return 0, 0, err
}
if nodeCPU <= 0 || nodeMemory <= 0 {
return 0, 0, fmt.Errorf("Invalid node CPU/memory values - cpu %v, memory %v", nodeCPU, nodeMemory)
}
nodeMemoryMb := math.Ceil(float64(nodeMemory) / Megabyte)
return nodeCPU, int64(nodeMemoryMb), nil
}
func getNodeResource(nodeInfo *schedulercache.NodeInfo, resource apiv1.ResourceName) (int64, error) {
nodeCapacity, found := nodeInfo.Node().Status.Capacity[resource]
if !found {
return 0, fmt.Errorf("Failed to get %v for node %v", resource, nodeInfo.Node().Name)
}
return nodeCapacity.Value(), nil
func getNodeInfoCoresAndMemory(nodeInfo *schedulercache.NodeInfo) (int64, int64, error) {
return getNodeCoresAndMemory(nodeInfo.Node())
}

View File

@ -27,6 +27,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
@ -59,23 +60,24 @@ type podConfig struct {
node string
}
type scaleUpConfig struct {
type scaleTestConfig struct {
nodes []nodeConfig
pods []podConfig
extraPods []podConfig
expectedScaleUp string
expectedScaleUpGroup string
expectedScaleDowns []string
options AutoscalingOptions
}
var defaultOptions = AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
func TestScaleUpOK(t *testing.T) {
config := &scaleUpConfig{
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 100, 100, true, "ng1"},
{"n2", 1000, 1000, true, "ng2"},
@ -98,7 +100,7 @@ func TestScaleUpOK(t *testing.T) {
func TestScaleUpMaxCoresLimitHit(t *testing.T) {
options := defaultOptions
options.MaxCoresTotal = 9
config := &scaleUpConfig{
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 2000, 100, true, "ng1"},
{"n2", 4000, 1000, true, "ng2"},
@ -124,7 +126,7 @@ const MB = 1024 * 1024
func TestScaleUpMaxMemoryLimitHit(t *testing.T) {
options := defaultOptions
options.MaxMemoryTotal = 1300 // set in mb
config := &scaleUpConfig{
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 2000, 100 * MB, true, "ng1"},
{"n2", 4000, 1000 * MB, true, "ng2"},
@ -146,7 +148,7 @@ func TestScaleUpMaxMemoryLimitHit(t *testing.T) {
simpleScaleUpTest(t, config)
}
func simpleScaleUpTest(t *testing.T, config *scaleUpConfig) {
func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
expandedGroups := make(chan string, 10)
fakeClient := &fake.Clientset{}
@ -281,8 +283,8 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -404,8 +406,8 @@ func TestScaleUpUnhealthy(t *testing.T) {
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -455,8 +457,8 @@ func TestScaleUpNoHelp(t *testing.T) {
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -536,8 +538,8 @@ func TestScaleUpBalanceGroups(t *testing.T) {
AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
BalanceSimilarNodeGroups: true,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,

View File

@ -18,6 +18,7 @@ package core
import (
"fmt"
"math"
"math/rand"
"reflect"
"time"
@ -415,3 +416,36 @@ func ConfigurePredicateCheckerForLoop(unschedulablePods []*apiv1.Pod, schedulabl
glog.V(1).Info("No pod using affinity / antiaffinity found in cluster, disabling affinity predicate for this loop")
}
}
// Getting node cores/memory
const (
// Megabyte is 2^20 bytes.
Megabyte float64 = 1024 * 1024
)
func getNodeCoresAndMemory(node *apiv1.Node) (int64, int64, error) {
cores, err := getNodeResource(node, apiv1.ResourceCPU)
if err != nil {
return 0, 0, err
}
memory, err := getNodeResource(node, apiv1.ResourceMemory)
if err != nil {
return 0, 0, err
}
if cores <= 0 || memory <= 0 {
return 0, 0, fmt.Errorf("Invalid node CPU/memory values - cpu %v, memory %v", cores, memory)
}
memoryMb := math.Ceil(float64(memory) / Megabyte)
return cores, int64(memoryMb), nil
}
func getNodeResource(node *apiv1.Node, resource apiv1.ResourceName) (int64, error) {
nodeCapacity, found := node.Status.Capacity[resource]
if !found {
return 0, fmt.Errorf("Failed to get %v for node %v", resource, node.Name)
}
return nodeCapacity.Value(), nil
}

View File

@ -18,7 +18,6 @@ package core
import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"testing"
"time"
@ -26,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
apiv1 "k8s.io/api/core/v1"
@ -422,3 +422,40 @@ func TestConfigurePredicateCheckerForLoop(t *testing.T) {
ConfigurePredicateCheckerForLoop([]*apiv1.Pod{p2}, []*apiv1.Pod{p3}, predicateChecker)
assert.False(t, predicateChecker.IsAffinityPredicateEnabled())
}
func TestGetNodeResource(t *testing.T) {
node := BuildTestNode("n1", 1000, 2*MB)
cores, err := getNodeResource(node, apiv1.ResourceCPU)
assert.NoError(t, err)
assert.Equal(t, int64(1), cores)
memory, err := getNodeResource(node, apiv1.ResourceMemory)
assert.NoError(t, err)
assert.Equal(t, int64(2*MB), memory)
_, err = getNodeResource(node, "custom resource")
assert.Error(t, err)
node.Status.Capacity = apiv1.ResourceList{}
_, err = getNodeResource(node, apiv1.ResourceCPU)
assert.Error(t, err)
_, err = getNodeResource(node, apiv1.ResourceMemory)
assert.Error(t, err)
}
func TestGetNodeCoresAndMemory(t *testing.T) {
node := BuildTestNode("n1", 2000, 2048*MB)
cores, memory, err := getNodeCoresAndMemory(node)
assert.NoError(t, err)
assert.Equal(t, int64(2), cores)
assert.Equal(t, int64(2048), memory)
node.Status.Capacity = apiv1.ResourceList{}
_, _, err = getNodeCoresAndMemory(node)
assert.Error(t, err)
}

View File

@ -103,8 +103,8 @@ var (
"max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", "0:320000", "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", "0:6400000", "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws, kubemark")
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.")
@ -419,3 +419,7 @@ func validateMinMaxFlag(min, max int64) error {
}
return nil
}
func minMaxFlagString(min, max int64) string {
return fmt.Sprintf("%v:%v", min, max)
}