Restructure checking resource limits in scale_up.go

Preparatory work for before introducing GPU limits
This commit is contained in:
Łukasz Osipiuk 2018-06-12 11:49:07 +02:00 committed by Łukasz Osipiuk
parent a686c674ac
commit 9f75099d2c
4 changed files with 226 additions and 80 deletions

View File

@ -108,10 +108,10 @@ type scaleDownResourcesLimits map[string]int64
type scaleDownResourcesDelta map[string]int64
// used as a value in scaleDownResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
const limitUnknown = math.MinInt64
const scaleDownLimitUnknown = math.MinInt64
func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *cloudprovider.ResourceLimiter, cp cloudprovider.CloudProvider, timestamp time.Time) scaleDownResourcesLimits {
totalCores, totalMem := calculateCoresAndMemoryTotal(nodes, timestamp)
totalCores, totalMem := calculateScaleDownCoresMemoryTotal(nodes, timestamp)
var totalGpus map[string]int64
var totalGpusErr error
@ -132,7 +132,7 @@ func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *c
resultScaleDownLimits[resource] = computeLeft(totalMem, min)
case cloudprovider.IsGpuResource(resource):
if totalGpusErr != nil {
resultScaleDownLimits[resource] = limitUnknown
resultScaleDownLimits[resource] = scaleDownLimitUnknown
} else {
resultScaleDownLimits[resource] = computeLeft(totalGpus[resource], min)
}
@ -161,7 +161,7 @@ func computeLeft(total int64, min int64) int64 {
}
func calculateCoresAndMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, int64) {
func calculateScaleDownCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, int64) {
var coresTotal, memoryTotal int64
for _, node := range nodes {
if isNodeBeingDeleted(node, timestamp) {
@ -236,7 +236,7 @@ func isNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool {
return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime)
}
func noLimitsOnResources() scaleDownResourcesLimits {
func noScaleDownLimitsOnResources() scaleDownResourcesLimits {
return nil
}
@ -265,34 +265,34 @@ func computeScaleDownResourcesDelta(node *apiv1.Node, nodeGroup cloudprovider.No
return resultScaleDownDelta, nil
}
type limitCheckResult struct {
type scaleDownLimitsCheckResult struct {
exceeded bool
exceededResources []string
}
func notExceeded() limitCheckResult {
return limitCheckResult{false, []string{}}
func scaleDownLimitsNotExceeded() scaleDownLimitsCheckResult {
return scaleDownLimitsCheckResult{false, []string{}}
}
func (limits *scaleDownResourcesLimits) checkDeltaWithinLimits(delta scaleDownResourcesDelta) limitCheckResult {
func (limits *scaleDownResourcesLimits) checkScaleDownDeltaWithinLimits(delta scaleDownResourcesDelta) scaleDownLimitsCheckResult {
exceededResources := sets.NewString()
for resource, resourceDelta := range delta {
resourceLeft, found := (*limits)[resource]
if found {
if (resourceDelta > 0) && (resourceLeft == limitUnknown || resourceDelta > resourceLeft) {
if (resourceDelta > 0) && (resourceLeft == scaleDownLimitUnknown || resourceDelta > resourceLeft) {
exceededResources.Insert(resource)
}
}
}
if len(exceededResources) > 0 {
return limitCheckResult{true, exceededResources.List()}
return scaleDownLimitsCheckResult{true, exceededResources.List()}
}
return notExceeded()
return scaleDownLimitsNotExceeded()
}
func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDownResourcesDelta) limitCheckResult {
result := limits.checkDeltaWithinLimits(delta)
func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDownResourcesDelta) scaleDownLimitsCheckResult {
result := limits.checkScaleDownDeltaWithinLimits(delta)
if result.exceeded {
return result
}
@ -302,7 +302,7 @@ func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDow
(*limits)[resource] = resourceLeft - resourceDelta
}
}
return notExceeded()
return scaleDownLimitsNotExceeded()
}
// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
@ -638,7 +638,7 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
continue
}
checkResult := scaleDownResourcesLeft.checkDeltaWithinLimits(scaleDownResourcesDelta)
checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.exceeded {
glog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.exceededResources)
continue
@ -733,7 +733,7 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
func getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int,
cloudProvider cloudprovider.CloudProvider) []*apiv1.Node {
return getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noLimitsOnResources(), cloudProvider)
return getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources(), cloudProvider)
}
// This functions finds empty nodes among passed candidates and returns a list of empty nodes

View File

@ -1338,7 +1338,7 @@ func TestCalculateCoresAndMemoryTotal(t *testing.T) {
},
}
coresTotal, memoryTotal := calculateCoresAndMemoryTotal(nodes, time.Now())
coresTotal, memoryTotal := calculateScaleDownCoresMemoryTotal(nodes, time.Now())
assert.Equal(t, int64(42), coresTotal)
assert.Equal(t, int64(44000*MB), memoryTotal)
@ -1387,7 +1387,7 @@ func TestFilterOutMasters(t *testing.T) {
assertEqualSet(t, []string{"n1", "n2", "n4", "n5", "n6"}, withoutMastersNames)
}
func TestCheckDeltaWithinLimits(t *testing.T) {
func TestCheckScaleDownDeltaWithinLimits(t *testing.T) {
type testcase struct {
limits scaleDownResourcesLimits
delta scaleDownResourcesDelta
@ -1415,12 +1415,12 @@ func TestCheckDeltaWithinLimits(t *testing.T) {
exceededResources: []string{},
},
{
limits: scaleDownResourcesLimits{"a": limitUnknown},
limits: scaleDownResourcesLimits{"a": scaleDownLimitUnknown},
delta: scaleDownResourcesDelta{"a": 0},
exceededResources: []string{},
},
{
limits: scaleDownResourcesLimits{"a": limitUnknown},
limits: scaleDownResourcesLimits{"a": scaleDownLimitUnknown},
delta: scaleDownResourcesDelta{"a": 1},
exceededResources: []string{"a"},
},
@ -1432,11 +1432,11 @@ func TestCheckDeltaWithinLimits(t *testing.T) {
}
for _, test := range tests {
checkResult := test.limits.checkDeltaWithinLimits(test.delta)
checkResult := test.limits.checkScaleDownDeltaWithinLimits(test.delta)
if len(test.exceededResources) == 0 {
assert.Equal(t, notExceeded(), checkResult)
assert.Equal(t, scaleDownLimitsNotExceeded(), checkResult)
} else {
assert.Equal(t, limitCheckResult{true, test.exceededResources}, checkResult)
assert.Equal(t, scaleDownLimitsCheckResult{true, test.exceededResources}, checkResult)
}
}
}

View File

@ -18,10 +18,13 @@ package core
import (
"bytes"
"fmt"
"math"
"time"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
@ -38,6 +41,119 @@ import (
"github.com/golang/glog"
)
type scaleUpResourcesLimits map[string]int64
type scaleUpResourcesDelta map[string]int64
// used as a value in scaleUpResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
const scaleUpLimitUnknown = math.MaxInt64
func computeScaleUpResourcesLeftLimits(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesLimits, errors.AutoscalerError) {
totalCores, totalMem, errCoresMem := calculateScaleUpCoresMemoryTotal(nodeGroups, nodeInfos)
resultScaleUpLimits := make(scaleUpResourcesLimits)
for _, resource := range resourceLimiter.GetResources() {
max := resourceLimiter.GetMax(resource)
// we put only actual limits into final map. No entry means no limit.
if max > 0 {
if (resource == cloudprovider.ResourceNameCores || resource == cloudprovider.ResourceNameMemory) && errCoresMem != nil {
// core resource info missing - no reason to proceed with scale up
return scaleUpResourcesLimits{}, errCoresMem
}
switch {
case resource == cloudprovider.ResourceNameCores:
if errCoresMem != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalCores, max)
}
case resource == cloudprovider.ResourceNameMemory:
if errCoresMem != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalMem, max)
}
default:
glog.Errorf("Scale up limits defined for unsupported resource '%s'", resource)
}
}
}
return resultScaleUpLimits, nil
}
func calculateScaleUpCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64, errors.AutoscalerError) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range nodeGroups {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
return 0, 0, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id())
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
return 0, 0, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
coresTotal = coresTotal + int64(currentSize)*nodeCPU
memoryTotal = memoryTotal + int64(currentSize)*nodeMemory
}
}
return coresTotal, memoryTotal, nil
}
func computeBelowMax(total int64, max int64) int64 {
if total < max {
return max - total
}
return 0
}
func computeScaleUpResourcesDelta(nodeInfo *schedulercache.NodeInfo) (scaleUpResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(scaleUpResourcesDelta)
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory
return resultScaleUpDelta, nil
}
type scaleUpLimitsCheckResult struct {
exceeded bool
exceededResources []string
}
func scaleUpLimitsNotExceeded() scaleUpLimitsCheckResult {
return scaleUpLimitsCheckResult{false, []string{}}
}
func (limits *scaleUpResourcesLimits) checkScaleUpDeltaWithinLimits(delta scaleUpResourcesDelta) scaleUpLimitsCheckResult {
exceededResources := sets.NewString()
for resource, resourceDelta := range delta {
resourceLeft, found := (*limits)[resource]
if found {
if (resourceDelta > 0) && (resourceLeft == scaleUpLimitUnknown || resourceDelta > resourceLeft) {
exceededResources.Insert(resource)
}
}
}
if len(exceededResources) > 0 {
return scaleUpLimitsCheckResult{true, exceededResources.List()}
}
return scaleUpLimitsNotExceeded()
}
func getNodeInfoCoresAndMemory(nodeInfo *schedulercache.NodeInfo) (int64, int64) {
return getNodeCoresAndMemory(nodeInfo.Node())
}
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
@ -75,8 +191,11 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
errors.CloudProviderError,
errCP)
}
// calculate current cores & gigabytes of memory
coresTotal, memoryTotal := calculateClusterCoresMemoryTotal(nodeGroups, nodeInfos)
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(nodeGroups, nodeInfos, resourceLimiter)
if errLimits != nil {
return nil, errLimits.AddPrefix("Could not compute total resources: ")
}
upcomingNodes := make([]*schedulercache.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
@ -128,15 +247,14 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
continue
}
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
if nodeCPU > (resourceLimiter.GetMax(cloudprovider.ResourceNameCores) - coresTotal) {
// skip this node group
glog.V(4).Infof("Skipping node group %s - not enough cores limit left", nodeGroup.Id())
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(nodeInfo)
if err != nil {
glog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
continue
}
if nodeMemory > (resourceLimiter.GetMax(cloudprovider.ResourceNameMemory) - memoryTotal) {
// skip this node group
glog.V(4).Infof("Skipping node group %s - not enough memory limit left", nodeGroup.Id())
checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta)
if checkResult.exceeded {
glog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.exceededResources)
continue
}
@ -239,7 +357,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}
// apply upper limits for CPU and memory
newNodes, err = applyMaxClusterCoresMemoryLimits(newNodes, coresTotal, memoryTotal, resourceLimiter.GetMax(cloudprovider.ResourceNameCores), resourceLimiter.GetMax(cloudprovider.ResourceNameMemory), nodeInfo)
newNodes, err = applyMaxClusterCoresMemoryLimits(newNodes, scaleUpResourcesLeft, nodeInfo)
if err != nil {
return nil, err
}
@ -368,57 +486,36 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
return nil
}
func calculateClusterCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range nodeGroups {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
glog.Errorf("Failed to get node group size of %v: %v", nodeGroup.Id(), err)
continue
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
glog.Errorf("No node info for: %s", nodeGroup.Id())
continue
}
if currentSize > 0 {
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
coresTotal = coresTotal + int64(currentSize)*nodeCPU
memoryTotal = memoryTotal + int64(currentSize)*nodeMemory
}
func applyMaxClusterCoresMemoryLimits(newNodes int, scaleUpResourcesLeft scaleUpResourcesLimits, nodeInfo *schedulercache.NodeInfo) (int, errors.AutoscalerError) {
delta, err := computeScaleUpResourcesDelta(nodeInfo)
if err != nil {
return 0, err
}
return coresTotal, memoryTotal
}
func applyMaxClusterCoresMemoryLimits(newNodes int, coresTotal, memoryTotal, maxCoresTotal, maxMemoryTotal int64, nodeInfo *schedulercache.NodeInfo) (int, errors.AutoscalerError) {
newNodeCPU, newNodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
if coresTotal+newNodeCPU*int64(newNodes) > maxCoresTotal {
glog.V(1).Infof("Capping size to max cluster cores (%d)", maxCoresTotal)
newNodes = int((maxCoresTotal - coresTotal) / newNodeCPU)
if newNodes < 1 {
// This should never happen, as we already check that
// at least one node will fit when considering nodegroup
return 0, errors.NewAutoscalerError(
errors.TransientError,
"max cores already reached")
for resource, resourceDelta := range delta {
limit, limitFound := scaleUpResourcesLeft[resource]
if !limitFound {
continue
}
}
if memoryTotal+newNodeMemory*int64(newNodes) > maxMemoryTotal {
glog.V(1).Infof("Capping size to max cluster memory allowed (%d)", maxMemoryTotal)
newNodes = int((maxMemoryTotal - memoryTotal) / newNodeMemory)
if newNodes < 1 {
// This should never happen, as we already check that
// at least one node will fit when considering nodegroup
if limit == scaleUpLimitUnknown {
// should never happen - checked before
return 0, errors.NewAutoscalerError(
errors.TransientError,
"max memory already reached")
errors.InternalError,
fmt.Sprintf("limit unknown for resource %s", resource))
}
if int64(newNodes)*resourceDelta <= limit {
// no capping required
continue
}
newNodes = int(limit / resourceDelta)
glog.V(1).Infof("Capping scale-up size due to limit for resource %s", resource)
if newNodes < 1 {
// should never happen - checked before
return 0, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("cannot create any node; max limit for resource %s reached", resource))
}
}
return newNodes, nil
}
func getNodeInfoCoresAndMemory(nodeInfo *schedulercache.NodeInfo) (int64, int64) {
return getNodeCoresAndMemory(nodeInfo.Node())
}

View File

@ -809,3 +809,52 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups))
assert.Equal(t, "autoprovisioned-T1-1", getStringFromChan(expandedGroups))
}
func TestCheckScaleUpDeltaWithinLimits(t *testing.T) {
type testcase struct {
limits scaleUpResourcesLimits
delta scaleUpResourcesDelta
exceededResources []string
}
tests := []testcase{
{
limits: scaleUpResourcesLimits{"a": 10},
delta: scaleUpResourcesDelta{"a": 10},
exceededResources: []string{},
},
{
limits: scaleUpResourcesLimits{"a": 10},
delta: scaleUpResourcesDelta{"a": 11},
exceededResources: []string{"a"},
},
{
limits: scaleUpResourcesLimits{"a": 10},
delta: scaleUpResourcesDelta{"b": 10},
exceededResources: []string{},
},
{
limits: scaleUpResourcesLimits{"a": scaleUpLimitUnknown},
delta: scaleUpResourcesDelta{"a": 0},
exceededResources: []string{},
},
{
limits: scaleUpResourcesLimits{"a": scaleUpLimitUnknown},
delta: scaleUpResourcesDelta{"a": 1},
exceededResources: []string{"a"},
},
{
limits: scaleUpResourcesLimits{"a": 10, "b": 20, "c": 30},
delta: scaleUpResourcesDelta{"a": 11, "b": 20, "c": 31},
exceededResources: []string{"a", "c"},
},
}
for _, test := range tests {
checkResult := test.limits.checkScaleUpDeltaWithinLimits(test.delta)
if len(test.exceededResources) == 0 {
assert.Equal(t, scaleUpLimitsNotExceeded(), checkResult)
} else {
assert.Equal(t, scaleUpLimitsCheckResult{true, test.exceededResources}, checkResult)
}
}
}