Respect GPU limits in scale_up

This commit is contained in:
Łukasz Osipiuk 2018-06-13 19:58:28 +02:00 committed by Łukasz Osipiuk
parent 57ea19599e
commit b7323bc0d1
3 changed files with 89 additions and 22 deletions

View File

@ -170,6 +170,16 @@ func IsGpuResource(resourceName string) bool {
return resourceName != ResourceNameCores && resourceName != ResourceNameMemory
}
// ContainsGpuResources returns true iff given list contains any resource name denoting a gpu type
func ContainsGpuResources(resources []string) bool {
for _, resource := range resources {
if IsGpuResource(resource) {
return true
}
}
return false
}
// ResourceLimiter contains limits (max, min) for resources (cores, memory etc.).
type ResourceLimiter struct {
minLimits map[string]int64

View File

@ -115,8 +115,8 @@ func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *c
var totalGpus map[string]int64
var totalGpusErr error
if containsGpuResources(resourceLimiter.GetResources()) {
totalGpus, totalGpusErr = calculateGpusTotal(nodes, cp, timestamp)
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
totalGpus, totalGpusErr = calculateScaleDownGpusTotal(nodes, cp, timestamp)
}
resultScaleDownLimits := make(scaleDownResourcesLimits)
@ -127,14 +127,14 @@ func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *c
if min > 0 {
switch {
case resource == cloudprovider.ResourceNameCores:
resultScaleDownLimits[resource] = computeLeft(totalCores, min)
resultScaleDownLimits[resource] = computeAboveMin(totalCores, min)
case resource == cloudprovider.ResourceNameMemory:
resultScaleDownLimits[resource] = computeLeft(totalMem, min)
resultScaleDownLimits[resource] = computeAboveMin(totalMem, min)
case cloudprovider.IsGpuResource(resource):
if totalGpusErr != nil {
resultScaleDownLimits[resource] = scaleDownLimitUnknown
} else {
resultScaleDownLimits[resource] = computeLeft(totalGpus[resource], min)
resultScaleDownLimits[resource] = computeAboveMin(totalGpus[resource], min)
}
default:
glog.Errorf("Scale down limits defined for unsupported resource '%s'", resource)
@ -144,16 +144,7 @@ func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *c
return resultScaleDownLimits
}
func containsGpuResources(resources []string) bool {
for _, resource := range resources {
if cloudprovider.IsGpuResource(resource) {
return true
}
}
return false
}
func computeLeft(total int64, min int64) int64 {
func computeAboveMin(total int64, min int64) int64 {
if total > min {
return total - min
}
@ -177,7 +168,7 @@ func calculateScaleDownCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time
return coresTotal, memoryTotal
}
func calculateGpusTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProvider, timestamp time.Time) (map[string]int64, error) {
func calculateScaleDownGpusTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProvider, timestamp time.Time) (map[string]int64, error) {
type gpuInfo struct {
name string
count int64
@ -255,7 +246,7 @@ func computeScaleDownResourcesDelta(node *apiv1.Node, nodeGroup cloudprovider.No
resultScaleDownDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleDownDelta[cloudprovider.ResourceNameMemory] = nodeMemory
if containsGpuResources(resourcesWithLimits) {
if cloudprovider.ContainsGpuResources(resourcesWithLimits) {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(node, nodeGroup)
if err != nil {
return scaleDownResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v gpu: %v", node.Name)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
@ -54,6 +55,12 @@ func computeScaleUpResourcesLeftLimits(
resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesLimits, errors.AutoscalerError) {
totalCores, totalMem, errCoresMem := calculateScaleUpCoresMemoryTotal(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
var totalGpus map[string]int64
var totalGpusErr error
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
totalGpus, totalGpusErr = calculateScaleUpGpusTotal(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
}
resultScaleUpLimits := make(scaleUpResourcesLimits)
for _, resource := range resourceLimiter.GetResources() {
max := resourceLimiter.GetMax(resource)
@ -79,6 +86,13 @@ func computeScaleUpResourcesLeftLimits(
resultScaleUpLimits[resource] = computeBelowMax(totalMem, max)
}
case cloudprovider.IsGpuResource(resource):
if totalGpusErr != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalGpus[resource], max)
}
default:
glog.Errorf("Scale up limits defined for unsupported resource '%s'", resource)
}
@ -120,6 +134,44 @@ func calculateScaleUpCoresMemoryTotal(
return coresTotal, memoryTotal, nil
}
func calculateScaleUpGpusTotal(
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulercache.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node) (map[string]int64, errors.AutoscalerError) {
result := make(map[string]int64)
for _, nodeGroup := range nodeGroups {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id())
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(nodeInfo.Node(), nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
}
if gpuType == "" {
continue
}
result[gpuType] += gpuCount * int64(currentSize)
}
}
for _, node := range nodesFromNotAutoscaledGroups {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(node, nil)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node gpus count for node %v:", node.Name)
}
result[gpuType] += gpuCount
}
return result, nil
}
func computeBelowMax(total int64, max int64) int64 {
if total < max {
return max - total
@ -127,13 +179,21 @@ func computeBelowMax(total int64, max int64) int64 {
return 0
}
func computeScaleUpResourcesDelta(nodeInfo *schedulercache.NodeInfo) (scaleUpResourcesDelta, errors.AutoscalerError) {
func computeScaleUpResourcesDelta(nodeInfo *schedulercache.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(scaleUpResourcesDelta)
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(nodeInfo.Node(), nodeGroup)
if err != nil {
return scaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
}
resultScaleUpDelta[gpuType] = gpuCount
}
return resultScaleUpDelta, nil
}
@ -265,7 +325,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
continue
}
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(nodeInfo)
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
glog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
continue
@ -375,7 +435,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}
// apply upper limits for CPU and memory
newNodes, err = applyMaxClusterCoresMemoryLimits(newNodes, scaleUpResourcesLeft, nodeInfo)
newNodes, err = applyScaleUpResourcesLimits(newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
if err != nil {
return nil, err
}
@ -504,8 +564,14 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
return nil
}
func applyMaxClusterCoresMemoryLimits(newNodes int, scaleUpResourcesLeft scaleUpResourcesLimits, nodeInfo *schedulercache.NodeInfo) (int, errors.AutoscalerError) {
delta, err := computeScaleUpResourcesDelta(nodeInfo)
func applyScaleUpResourcesLimits(
newNodes int,
scaleUpResourcesLeft scaleUpResourcesLimits,
nodeInfo *schedulercache.NodeInfo,
nodeGroup cloudprovider.NodeGroup,
resourceLimiter *cloudprovider.ResourceLimiter) (int, errors.AutoscalerError) {
delta, err := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
return 0, err
}