Support scaling up node groups to the configured min size if needed

This commit is contained in:
Xintong Liu 2022-09-16 19:33:39 -07:00
parent 9e6d7146ef
commit 524886fca5
10 changed files with 933 additions and 373 deletions

View File

@ -735,6 +735,7 @@ The following startup parameters are supported for cluster autoscaler:
| `kubeconfig` | Path to kubeconfig file with authorization and API Server location information | "" | `kubeconfig` | Path to kubeconfig file with authorization and API Server location information | ""
| `cloud-config` | The path to the cloud provider configuration file. Empty string for no configuration file | "" | `cloud-config` | The path to the cloud provider configuration file. Empty string for no configuration file | ""
| `namespace` | Namespace in which cluster-autoscaler run | "kube-system" | `namespace` | Namespace in which cluster-autoscaler run | "kube-system"
| `scale-up-node-group-to-min-size-enabled` | Should CA scale up the node group to the configured min size if needed | false
| `scale-down-enabled` | Should CA scale down the cluster | true | `scale-down-enabled` | Should CA scale down the cluster | true
| `scale-down-delay-after-add` | How long after scale up that scale down evaluation resumes | 10 minutes | `scale-down-delay-after-add` | How long after scale up that scale down evaluation resumes | 10 minutes
| `scale-down-delay-after-delete` | How long after node deletion that scale down evaluation resumes, defaults to scan-interval | scan-interval | `scale-down-delay-after-delete` | How long after node deletion that scale down evaluation resumes, defaults to scan-interval | scan-interval
@ -948,7 +949,14 @@ Events:
``` ```
### My cluster is below minimum / above maximum number of nodes, but CA did not fix that! Why? ### My cluster is below minimum / above maximum number of nodes, but CA did not fix that! Why?
Cluster Autoscaler will not scale the cluster beyond these limits, but does not enforce them. If your cluster is below the minimum number of nodes configured for Cluster Autoscaler, it will be scaled up *only* in presence of unschedulable pods. Cluster Autoscaler will not scale the cluster beyond these limits, but some other external factors could make this happen. Here are some common scenarios.
* Existing nodes were deleted from K8s and the cloud provider, which could cause the cluster fell below the minimum number of nodes.
* New nodes were added directly to the cloud provider, which could cause the cluster exceeded the maximum number of nodes.
* Cluster Autoscaler was turned on in the middle of the cluster lifecycle, and the initial number of nodes might beyond these limits.
By default, Cluster Autoscaler does not enforce the node group size. If your cluster is below the minimum number of nodes configured for CA, it will be scaled up *only* in presence of unschedulable pods. On the other hand, if your cluster is above the minimum number of nodes configured for CA, it will be scaled down *only* if it has unneeded nodes.
Starting with CA 1.26.0, a new flag `--enforce-node-group-min-size` was introduced to enforce the node group minimum size. For node groups with fewer nodes than the configuration, CA will scale them up to the minimum number of nodes. To enable this feature, please set it to `true` in the command.
### What happens in scale-up when I have no more quota in the cloud provider? ### What happens in scale-up when I have no more quota in the cloud provider?

View File

@ -96,6 +96,8 @@ type AutoscalingOptions struct {
CloudProviderName string CloudProviderName string
// NodeGroups is the list of node groups a.k.a autoscaling targets // NodeGroups is the list of node groups a.k.a autoscaling targets
NodeGroups []string NodeGroups []string
// EnforceNodeGroupMinSize is used to allow CA to scale up the node group to the configured min size if needed.
EnforceNodeGroupMinSize bool
// ScaleDownEnabled is used to allow CA to scale down the cluster // ScaleDownEnabled is used to allow CA to scale down the cluster
ScaleDownEnabled bool ScaleDownEnabled bool
// ScaleDownDelayAfterAdd sets the duration from the last scale up to the time when CA starts to check scale down options // ScaleDownDelayAfterAdd sets the duration from the last scale up to the time when CA starts to check scale down options

View File

@ -18,16 +18,15 @@ package core
import ( import (
"fmt" "fmt"
"math"
"strings" "strings"
"time" "time"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
"k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/autoscaler/cluster-autoscaler/utils/taints"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
@ -45,205 +44,6 @@ import (
klog "k8s.io/klog/v2" klog "k8s.io/klog/v2"
) )
type scaleUpResourcesLimits map[string]int64
type scaleUpResourcesDelta map[string]int64
// used as a value in scaleUpResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
const scaleUpLimitUnknown = math.MaxInt64
func computeScaleUpResourcesLeftLimits(
context *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node,
resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesLimits, errors.AutoscalerError) {
totalCores, totalMem, errCoresMem := calculateScaleUpCoresMemoryTotal(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
var totalResources map[string]int64
var totalResourcesErr error
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
totalResources, totalResourcesErr = calculateScaleUpCustomResourcesTotal(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
}
resultScaleUpLimits := make(scaleUpResourcesLimits)
for _, resource := range resourceLimiter.GetResources() {
max := resourceLimiter.GetMax(resource)
// we put only actual limits into final map. No entry means no limit.
if max > 0 {
if (resource == cloudprovider.ResourceNameCores || resource == cloudprovider.ResourceNameMemory) && errCoresMem != nil {
// core resource info missing - no reason to proceed with scale up
return scaleUpResourcesLimits{}, errCoresMem
}
switch {
case resource == cloudprovider.ResourceNameCores:
if errCoresMem != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalCores, max)
}
case resource == cloudprovider.ResourceNameMemory:
if errCoresMem != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalMem, max)
}
case cloudprovider.IsCustomResource(resource):
if totalResourcesErr != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalResources[resource], max)
}
default:
klog.Errorf("Scale up limits defined for unsupported resource '%s'", resource)
}
}
}
return resultScaleUpLimits, nil
}
func calculateScaleUpCoresMemoryTotal(
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node) (int64, int64, errors.AutoscalerError) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range nodeGroups {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
return 0, 0, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id())
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
return 0, 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
coresTotal = coresTotal + int64(currentSize)*nodeCPU
memoryTotal = memoryTotal + int64(currentSize)*nodeMemory
}
}
for _, node := range nodesFromNotAutoscaledGroups {
cores, memory := utils.GetNodeCoresAndMemory(node)
coresTotal += cores
memoryTotal += memory
}
return coresTotal, memoryTotal, nil
}
func calculateScaleUpCustomResourcesTotal(
context *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node) (map[string]int64, errors.AutoscalerError) {
result := make(map[string]int64)
for _, nodeGroup := range nodeGroups {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id())
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, nodeInfo.Node(), nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
}
for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount * int64(currentSize)
}
}
}
for _, node := range nodesFromNotAutoscaledGroups {
resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, node, nil)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node gpus count for node %v:", node.Name)
}
for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount
}
}
return result, nil
}
func computeBelowMax(total int64, max int64) int64 {
if total < max {
return max - total
}
return 0
}
func computeScaleUpResourcesDelta(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors,
nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(scaleUpResourcesDelta)
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, nodeInfo.Node(), nodeGroup)
if err != nil {
return scaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target custom resources for node group %v:", nodeGroup.Id())
}
for _, resourceTarget := range resourceTargets {
resultScaleUpDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}
}
return resultScaleUpDelta, nil
}
type scaleUpLimitsCheckResult struct {
exceeded bool
exceededResources []string
}
func scaleUpLimitsNotExceeded() scaleUpLimitsCheckResult {
return scaleUpLimitsCheckResult{false, []string{}}
}
func (limits *scaleUpResourcesLimits) checkScaleUpDeltaWithinLimits(delta scaleUpResourcesDelta) scaleUpLimitsCheckResult {
exceededResources := sets.NewString()
for resource, resourceDelta := range delta {
resourceLeft, found := (*limits)[resource]
if found {
if (resourceDelta > 0) && (resourceLeft == scaleUpLimitUnknown || resourceDelta > resourceLeft) {
exceededResources.Insert(resource)
}
}
}
if len(exceededResources) > 0 {
return scaleUpLimitsCheckResult{true, exceededResources.List()}
}
return scaleUpLimitsNotExceeded()
}
func getNodeInfoCoresAndMemory(nodeInfo *schedulerframework.NodeInfo) (int64, int64) {
return utils.GetNodeCoresAndMemory(nodeInfo.Node())
}
type skippedReasons struct { type skippedReasons struct {
message []string message []string
} }
@ -317,10 +117,61 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG
return option, nil return option, nil
} }
func isNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeGroup, clusterStateRegistry *clusterstate.ClusterStateRegistry, now time.Time) (bool, *skippedReasons) {
// Autoprovisioned node groups without nodes are created later so skip check for them.
if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
// Hack that depends on internals of IsNodeGroupSafeToScaleUp.
if !clusterStateRegistry.IsNodeGroupHealthy(nodeGroup.Id()) {
klog.Warningf("Node group %s is not ready for scaleup - unhealthy", nodeGroup.Id())
return false, notReadyReason
}
klog.Warningf("Node group %s is not ready for scaleup - backoff", nodeGroup.Id())
return false, backoffReason
}
return true, nil
}
func isNodeGroupResourceExceeded(ctx *context.AutoscalingContext, resourceManager *scaleup.ResourceManager, resourcesLeft scaleup.ResourcesLimits, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) (bool, *skippedReasons) {
resourcesDelta, err := resourceManager.DeltaForNode(ctx, nodeInfo, nodeGroup)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
return true, notReadyReason
}
checkResult := scaleup.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta)
if checkResult.Exceeded {
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources)
for _, resource := range checkResult.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleUpCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleUpMemory()
default:
continue
}
}
return true, maxResourceLimitReached(checkResult.ExceededResources)
}
return false, nil
}
func getCappedNewNodeCount(context *context.AutoscalingContext, newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
if context.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > context.MaxNodesTotal {
klog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newNodeCount = context.MaxNodesTotal - currentNodeCount
context.LogRecorder.Eventf(apiv1.EventTypeWarning, "MaxNodesTotalReached", "Max total nodes in cluster reached: %v", context.MaxNodesTotal)
if newNodeCount < 1 {
return newNodeCount, errors.NewAutoscalerError(errors.TransientError, "max node total count already reached")
}
}
return newNodeCount, nil
}
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are // false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups. // ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, resourceManager *scaleup.ResourceManager, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, ignoredTaints taints.TaintKeySet) (*status.ScaleUpStatus, errors.AutoscalerError) { nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, ignoredTaints taints.TaintKeySet) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest // From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler. // node became available for the scheduler.
@ -329,35 +180,12 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil
} }
now := time.Now()
loggingQuota := klogx.PodsLoggingQuota() loggingQuota := klogx.PodsLoggingQuota()
for _, pod := range unschedulablePods { for _, pod := range unschedulablePods {
klogx.V(1).UpTo(loggingQuota).Infof("Pod %s/%s is unschedulable", pod.Namespace, pod.Name) klogx.V(1).UpTo(loggingQuota).Infof("Pod %s/%s is unschedulable", pod.Namespace, pod.Name)
} }
klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left()) klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, err.AddPrefix("failed to filter out nodes which are from not autoscaled groups: "))
}
nodeGroups := context.CloudProvider.NodeGroups()
gpuLabel := context.CloudProvider.GPULabel()
availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes()
resourceLimiter, errCP := context.CloudProvider.GetResourceLimiter()
if errCP != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(
errors.CloudProviderError,
errCP))
}
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
if errLimits != nil {
return scaleUpError(&status.ScaleUpStatus{}, errLimits.AddPrefix("Could not compute total resources: "))
}
upcomingNodes := make([]*schedulerframework.NodeInfo, 0) upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() { for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
@ -374,8 +202,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
} }
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
expansionOptions := make(map[string]expander.Option, 0) nodeGroups := context.CloudProvider.NodeGroups()
if processors != nil && processors.NodeGroupListProcessor != nil { if processors != nil && processors.NodeGroupListProcessor != nil {
var errProc error var errProc error
nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods) nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods)
@ -384,19 +211,21 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
} }
} }
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods) resourcesLeft, err := resourceManager.ResourcesLeft(context, nodeInfos, nodes)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, err.AddPrefix("could not compute total resources: "))
}
now := time.Now()
gpuLabel := context.CloudProvider.GPULabel()
availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes()
expansionOptions := make(map[string]expander.Option, 0)
skippedNodeGroups := map[string]status.Reasons{} skippedNodeGroups := map[string]status.Reasons{}
for _, nodeGroup := range nodeGroups { for _, nodeGroup := range nodeGroups {
// Autoprovisioned node groups without nodes are created later so skip check for them. if readyToScaleUp, skipReason := isNodeGroupReadyToScaleUp(nodeGroup, clusterStateRegistry, now); !readyToScaleUp {
if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) { if skipReason != nil {
// Hack that depends on internals of IsNodeGroupSafeToScaleUp. skippedNodeGroups[nodeGroup.Id()] = skipReason
if !clusterStateRegistry.IsNodeGroupHealthy(nodeGroup.Id()) {
klog.Warningf("Node group %s is not ready for scaleup - unhealthy", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
} else {
klog.Warningf("Node group %s is not ready for scaleup - backoff", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = backoffReason
} }
continue continue
} }
@ -420,25 +249,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
continue continue
} }
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter) if exceeded, skipReason := isNodeGroupResourceExceeded(context, resourceManager, resourcesLeft, nodeGroup, nodeInfo); exceeded {
if err != nil { if skipReason != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err) skippedNodeGroups[nodeGroup.Id()] = skipReason
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
continue
}
checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta)
if checkResult.exceeded {
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.exceededResources)
skippedNodeGroups[nodeGroup.Id()] = maxResourceLimitReached(checkResult.exceededResources)
for _, resource := range checkResult.exceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleUpCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleUpMemory()
default:
continue
}
} }
continue continue
} }
@ -458,6 +271,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id()) klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
} }
} }
if len(expansionOptions) == 0 { if len(expansionOptions) == 0 {
klog.V(1).Info("No expansion options") klog.V(1).Info("No expansion options")
return &status.ScaleUpStatus{ return &status.ScaleUpStatus{
@ -481,16 +295,11 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id()) klog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id())
newNodes := bestOption.NodeCount newNodes := bestOption.NodeCount
newNodeCount, err := getCappedNewNodeCount(context, newNodes, len(nodes)+len(upcomingNodes))
if context.MaxNodesTotal > 0 && len(nodes)+newNodes+len(upcomingNodes) > context.MaxNodesTotal { if err != nil {
klog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, err)
newNodes = context.MaxNodesTotal - len(nodes) - len(upcomingNodes)
context.LogRecorder.Eventf(apiv1.EventTypeWarning, "MaxNodesTotalReached", "Max total nodes in cluster reached: %v", context.MaxNodesTotal)
if newNodes < 1 {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
errors.NewAutoscalerError(errors.TransientError, "max node total count already reached"))
}
} }
newNodes = newNodeCount
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() { if !bestOption.NodeGroup.Exist() {
@ -557,7 +366,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
} }
// apply upper limits for CPU and memory // apply upper limits for CPU and memory
newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter) newNodes, err = resourceManager.ApplyResourcesLimits(context, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup)
if err != nil { if err != nil {
return scaleUpError( return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
@ -570,8 +379,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
if typedErr != nil { if typedErr != nil {
return scaleUpError( return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
typedErr.AddPrefix("Failed to find matching node groups: ")) typedErr.AddPrefix("failed to find matching node groups: "))
} }
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions) similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions)
for _, ng := range similarNodeGroups { for _, ng := range similarNodeGroups {
if clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) { if clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
@ -583,6 +393,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id()) klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
} }
} }
if len(targetNodeGroups) > 1 { if len(targetNodeGroups) > 1 {
var names = []string{} var names = []string{}
for _, ng := range targetNodeGroups { for _, ng := range targetNodeGroups {
@ -591,6 +402,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", ")) klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", "))
} }
} }
scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups( scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
context, targetNodeGroups, newNodes) context, targetNodeGroups, newNodes)
if typedErr != nil { if typedErr != nil {
@ -598,6 +410,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
typedErr) typedErr)
} }
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos { for _, info := range scaleUpInfos {
typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now) typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)
@ -632,6 +445,110 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}, nil }, nil
} }
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes than the configured min size.
// The source of truth for the current node group size is the TargetSize queried directly from cloud providers.
// Return the scale up status (ScaleUpNotNeeded, ScaleUpSuccessful or FailedResizeNodeGroups) and errors if any.
func ScaleUpToNodeGroupMinSize(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, resourceManager *scaleup.ResourceManager,
nodes []*apiv1.Node, nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
now := time.Now()
nodeGroups := context.CloudProvider.NodeGroups()
gpuLabel := context.CloudProvider.GPULabel()
availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes()
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)
resourcesLeft, err := resourceManager.ResourcesLeft(context, nodeInfos, nodes)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, err.AddPrefix("could not compute total resources: "))
}
for _, ng := range nodeGroups {
if !ng.Exist() {
klog.Warningf("ScaleUpToNodeGroupMinSize: NodeGroup %s does not exist", ng.Id())
continue
}
targetSize, err := ng.TargetSize()
if err != nil {
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to get target size of node group %s", ng.Id())
continue
}
klog.V(4).Infof("ScaleUpToNodeGroupMinSize: NodeGroup %s, TargetSize %d, MinSize %d, MaxSize %d", ng.Id(), targetSize, ng.MinSize(), ng.MaxSize())
if targetSize >= ng.MinSize() {
continue
}
if readyToScaleUp, skipReason := isNodeGroupReadyToScaleUp(ng, clusterStateRegistry, now); !readyToScaleUp {
klog.Warningf("ScaleUpToNodeGroupMinSize: node group is ready to scale up: %v", skipReason)
continue
}
nodeInfo, found := nodeInfos[ng.Id()]
if !found {
klog.Warningf("ScaleUpToNodeGroupMinSize: no node info for %s", ng.Id())
continue
}
exceeded, skipReason := isNodeGroupResourceExceeded(context, resourceManager, resourcesLeft, ng, nodeInfo)
if exceeded {
klog.Warning("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
continue
}
newNodeCount := ng.MinSize() - targetSize
newNodeCount, err = resourceManager.ApplyResourcesLimits(context, newNodeCount, resourcesLeft, nodeInfo, ng)
if err != nil {
klog.Warning("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
continue
}
newNodeCount, err = getCappedNewNodeCount(context, newNodeCount, targetSize)
if err != nil {
klog.Warning("ScaleUpToNodeGroupMinSize: failed to get capped node count: %v", err)
continue
}
info := nodegroupset.ScaleUpInfo{
Group: ng,
CurrentSize: targetSize,
NewSize: targetSize + newNodeCount,
MaxSize: ng.MaxSize(),
}
scaleUpInfos = append(scaleUpInfos, info)
}
if len(scaleUpInfos) == 0 {
klog.V(1).Info("ScaleUpToNodeGroupMinSize: scale up not needed")
return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil
}
klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos {
nodeInfo, ok := nodeInfos[info.Group.Id()]
if !ok {
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to get node info for node group %s", info.Group.Id())
continue
}
gpuType := gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil)
if err := executeScaleUp(context, clusterStateRegistry, info, gpuType, now); err != nil {
return scaleUpError(
&status.ScaleUpStatus{
FailedResizeNodeGroups: []cloudprovider.NodeGroup{info.Group},
},
err,
)
}
}
clusterStateRegistry.Recalculate()
return &status.ScaleUpStatus{
Result: status.ScaleUpSuccessful,
ScaleUpInfos: scaleUpInfos,
ConsideredNodeGroups: nodeGroups,
}, nil
}
func getRemainingPods(egs []*podEquivalenceGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo { func getRemainingPods(egs []*podEquivalenceGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
remaining := []status.NoScaleUpInfo{} remaining := []status.NoScaleUpInfo{}
for _, eg := range egs { for _, eg := range egs {
@ -717,48 +634,6 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
return nil return nil
} }
func applyScaleUpResourcesLimits(
context *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
newNodes int,
scaleUpResourcesLeft scaleUpResourcesLimits,
nodeInfo *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup,
resourceLimiter *cloudprovider.ResourceLimiter) (int, errors.AutoscalerError) {
delta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
return 0, err
}
for resource, resourceDelta := range delta {
limit, limitFound := scaleUpResourcesLeft[resource]
if !limitFound {
continue
}
if limit == scaleUpLimitUnknown {
// should never happen - checked before
return 0, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("limit unknown for resource %s", resource))
}
if int64(newNodes)*resourceDelta <= limit {
// no capping required
continue
}
newNodes = int(limit / resourceDelta)
klog.V(1).Infof("Capping scale-up size due to limit for resource %s", resource)
if newNodes < 1 {
// should never happen - checked before
return 0, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("cannot create any node; max limit for resource %s reached", resource))
}
}
return newNodes, nil
}
func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) { func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err s.ScaleUpError = &err
s.Result = status.ScaleUpError s.Result = status.ScaleUpError

View File

@ -30,6 +30,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
. "k8s.io/autoscaler/cluster-autoscaler/core/test" . "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/estimator"
@ -543,8 +544,8 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResul
} }
processors := NewTestProcessors() processors := NewTestProcessors()
resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err) assert.NoError(t, err)
@ -699,7 +700,8 @@ func TestScaleUpUnhealthy(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0) p3 := BuildTestPod("p-new", 550, 0)
processors := NewTestProcessors() processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err) assert.NoError(t, err)
// Node group is unhealthy. // Node group is unhealthy.
@ -740,7 +742,8 @@ func TestScaleUpNoHelp(t *testing.T) {
p3 := BuildTestPod("p-new", 500, 0) p3 := BuildTestPod("p-new", 500, 0)
processors := NewTestProcessors() processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err) assert.NoError(t, err)
@ -811,7 +814,8 @@ func TestScaleUpBalanceGroups(t *testing.T) {
} }
processors := NewTestProcessors() processors := NewTestProcessors()
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, resourceManager, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, typedErr) assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful()) assert.True(t, scaleUpStatus.WasSuccessful())
@ -871,7 +875,8 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
nodes := []*apiv1.Node{} nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful()) assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups)) assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
@ -924,7 +929,8 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
nodes := []*apiv1.Node{} nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, resourceManager, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful()) assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups)) assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
@ -937,51 +943,100 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
assert.True(t, expandedGroupMap["autoprovisioned-T1-2-1"]) assert.True(t, expandedGroupMap["autoprovisioned-T1-2-1"])
} }
func TestCheckScaleUpDeltaWithinLimits(t *testing.T) { func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
assert.Equal(t, "ng1", nodeGroup)
assert.Equal(t, 1, increase)
return nil
}, nil)
resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0},
map[string]int64{cloudprovider.ResourceNameCores: 48, cloudprovider.ResourceNameMemory: 1000},
)
provider.SetResourceLimiter(resourceLimiter)
// Test cases:
// ng1: current size 1, min size 3, cores limit 48, memory limit 1000 => scale up with 1 new node.
// ng2: current size 1, min size 1, cores limit 48, memory limit 1000 => no scale up.
n1 := BuildTestNode("n1", 16000, 32)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 16000, 32)
SetNodeReadyState(n2, true, time.Now())
provider.AddNodeGroup("ng1", 3, 10, 1)
provider.AddNode("ng1", n1)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNode("ng2", n2)
options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
processors := NewTestProcessors()
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
resourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
scaleUpStatus, err := ScaleUpToNodeGroupMinSize(&context, processors, clusterState, resourceManager, nodes, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, 1, len(scaleUpStatus.ScaleUpInfos))
assert.Equal(t, 2, scaleUpStatus.ScaleUpInfos[0].NewSize)
assert.Equal(t, "ng1", scaleUpStatus.ScaleUpInfos[0].Group.Id())
}
func TestCheckDeltaWithinLimits(t *testing.T) {
type testcase struct { type testcase struct {
limits scaleUpResourcesLimits limits scaleup.ResourcesLimits
delta scaleUpResourcesDelta delta scaleup.ResourcesDelta
exceededResources []string exceededResources []string
} }
tests := []testcase{ tests := []testcase{
{ {
limits: scaleUpResourcesLimits{"a": 10}, limits: scaleup.ResourcesLimits{"a": 10},
delta: scaleUpResourcesDelta{"a": 10}, delta: scaleup.ResourcesDelta{"a": 10},
exceededResources: []string{}, exceededResources: []string{},
}, },
{ {
limits: scaleUpResourcesLimits{"a": 10}, limits: scaleup.ResourcesLimits{"a": 10},
delta: scaleUpResourcesDelta{"a": 11}, delta: scaleup.ResourcesDelta{"a": 11},
exceededResources: []string{"a"}, exceededResources: []string{"a"},
}, },
{ {
limits: scaleUpResourcesLimits{"a": 10}, limits: scaleup.ResourcesLimits{"a": 10},
delta: scaleUpResourcesDelta{"b": 10}, delta: scaleup.ResourcesDelta{"b": 10},
exceededResources: []string{}, exceededResources: []string{},
}, },
{ {
limits: scaleUpResourcesLimits{"a": scaleUpLimitUnknown}, limits: scaleup.ResourcesLimits{"a": scaleup.LimitUnknown},
delta: scaleUpResourcesDelta{"a": 0}, delta: scaleup.ResourcesDelta{"a": 0},
exceededResources: []string{}, exceededResources: []string{},
}, },
{ {
limits: scaleUpResourcesLimits{"a": scaleUpLimitUnknown}, limits: scaleup.ResourcesLimits{"a": scaleup.LimitUnknown},
delta: scaleUpResourcesDelta{"a": 1}, delta: scaleup.ResourcesDelta{"a": 1},
exceededResources: []string{"a"}, exceededResources: []string{"a"},
}, },
{ {
limits: scaleUpResourcesLimits{"a": 10, "b": 20, "c": 30}, limits: scaleup.ResourcesLimits{"a": 10, "b": 20, "c": 30},
delta: scaleUpResourcesDelta{"a": 11, "b": 20, "c": 31}, delta: scaleup.ResourcesDelta{"a": 11, "b": 20, "c": 31},
exceededResources: []string{"a", "c"}, exceededResources: []string{"a", "c"},
}, },
} }
for _, test := range tests { for _, test := range tests {
checkResult := test.limits.checkScaleUpDeltaWithinLimits(test.delta) checkResult := scaleup.CheckDeltaWithinLimits(test.limits, test.delta)
if len(test.exceededResources) == 0 { if len(test.exceededResources) == 0 {
assert.Equal(t, scaleUpLimitsNotExceeded(), checkResult) assert.Equal(t, scaleup.LimitsNotExceeded(), checkResult)
} else { } else {
assert.Equal(t, scaleUpLimitsCheckResult{true, test.exceededResources}, checkResult) assert.Equal(t, scaleup.LimitsCheckResult{Exceeded: true, ExceededResources: test.exceededResources}, checkResult)
} }
} }
} }

View File

@ -185,7 +185,7 @@ func (lf *LimitsFinder) DeltaForNode(context *context.AutoscalingContext, node *
if cloudprovider.ContainsCustomResources(resourcesWithLimits) { if cloudprovider.ContainsCustomResources(resourcesWithLimits) {
resourceTargets, err := lf.crp.GetNodeResourceTargets(context, node, nodeGroup) resourceTargets, err := lf.crp.GetNodeResourceTargets(context, node, nodeGroup)
if err != nil { if err != nil {
return Delta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v custom resources: %v", node.Name) return Delta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get node %v custom resources: %v", node.Name)
} }
for _, resourceTarget := range resourceTargets { for _, resourceTarget := range resourceTargets {
resultScaleDownDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount resultScaleDownDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount

View File

@ -0,0 +1,286 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaleup
import (
"fmt"
"math"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
// LimitUnknown is used as a value in ResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider.
const LimitUnknown = math.MaxInt64
// ResourceManager provides resource checks before scaling up the cluster.
type ResourceManager struct {
crp customresources.CustomResourcesProcessor
}
// LimitsCheckResult contains the limit check result and the exceeded resources if any.
type LimitsCheckResult struct {
Exceeded bool
ExceededResources []string
}
// ResourcesLimits is a map: the key is resource type and the value is resource limit.
type ResourcesLimits map[string]int64
// ResourcesDelta is a map: the key is resource type and the value is resource delta.
type ResourcesDelta map[string]int64
// NewResourceManager creates an instance of scale up resource manager with provided parameters.
func NewResourceManager(crp customresources.CustomResourcesProcessor) *ResourceManager {
return &ResourceManager{
crp: crp,
}
}
// DeltaForNode calculates the amount of resources that will be used from the cluster when creating a node.
func (m *ResourceManager) DeltaForNode(ctx *context.AutoscalingContext, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (ResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(ResourcesDelta)
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(nodeInfo.Node())
resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory
resourceLimiter, err := ctx.CloudProvider.GetResourceLimiter()
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err)
}
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
resourceTargets, err := m.crp.GetNodeResourceTargets(ctx, nodeInfo.Node(), nodeGroup)
if err != nil {
return ResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get target custom resources for node group %v: ", nodeGroup.Id())
}
for _, resourceTarget := range resourceTargets {
resultScaleUpDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}
}
return resultScaleUpDelta, nil
}
// ResourcesLeft calculates the amount of resources left in the cluster.
func (m *ResourceManager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodes []*corev1.Node) (ResourcesLimits, errors.AutoscalerError) {
nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, ctx.CloudProvider)
if err != nil {
return nil, err.AddPrefix("failed to filter out nodes which are from not autoscaled groups: ")
}
totalCores, totalMem, errCoresMem := m.coresMemoryTotal(ctx, nodeInfos, nodesFromNotAutoscaledGroups)
resourceLimiter, errgo := ctx.CloudProvider.GetResourceLimiter()
if errgo != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, errgo)
}
var totalResources map[string]int64
var totalResourcesErr error
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
totalResources, totalResourcesErr = m.customResourcesTotal(ctx, nodeInfos, nodesFromNotAutoscaledGroups)
}
resultScaleUpLimits := make(ResourcesLimits)
for _, resource := range resourceLimiter.GetResources() {
max := resourceLimiter.GetMax(resource)
// we put only actual limits into final map. No entry means no limit.
if max > 0 {
if (resource == cloudprovider.ResourceNameCores || resource == cloudprovider.ResourceNameMemory) && errCoresMem != nil {
// core resource info missing - no reason to proceed with scale up
return ResourcesLimits{}, errCoresMem
}
switch {
case resource == cloudprovider.ResourceNameCores:
if errCoresMem != nil {
resultScaleUpLimits[resource] = LimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalCores, max)
}
case resource == cloudprovider.ResourceNameMemory:
if errCoresMem != nil {
resultScaleUpLimits[resource] = LimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalMem, max)
}
case cloudprovider.IsCustomResource(resource):
if totalResourcesErr != nil {
resultScaleUpLimits[resource] = LimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalResources[resource], max)
}
default:
klog.Errorf("Scale up limits defined for unsupported resource '%s'", resource)
}
}
}
return resultScaleUpLimits, nil
}
// ApplyResourcesLimits calculates the new node count by applying the left resource limits of the cluster.
func (m *ResourceManager) ApplyResourcesLimits(ctx *context.AutoscalingContext, newCount int, resourceLeft ResourcesLimits, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (int, errors.AutoscalerError) {
delta, err := m.DeltaForNode(ctx, nodeInfo, nodeGroup)
if err != nil {
return 0, err
}
for resource, resourceDelta := range delta {
limit, limitFound := resourceLeft[resource]
if !limitFound {
continue
}
if limit == LimitUnknown {
// should never happen - checked before
return 0, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("limit unknown for resource %s", resource))
}
if int64(newCount)*resourceDelta <= limit {
// no capping required
continue
}
newCount = int(limit / resourceDelta)
klog.V(1).Infof("Capping scale-up size due to limit for resource %s", resource)
if newCount < 1 {
// should never happen - checked before
return 0, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("cannot create any node; max limit for resource %s reached", resource))
}
}
return newCount, nil
}
// CheckDeltaWithinLimits compares the resource limit and resource delta, and returns the limit check result.
func CheckDeltaWithinLimits(left ResourcesLimits, delta ResourcesDelta) LimitsCheckResult {
exceededResources := sets.NewString()
for resource, resourceDelta := range delta {
resourceLeft, found := left[resource]
if found {
if (resourceDelta > 0) && (resourceLeft == LimitUnknown || resourceDelta > resourceLeft) {
exceededResources.Insert(resource)
}
}
}
if len(exceededResources) > 0 {
return LimitsCheckResult{true, exceededResources.List()}
}
return LimitsNotExceeded()
}
// LimitsNotExceeded returns a not exceeded limit check result.
func LimitsNotExceeded() LimitsCheckResult {
return LimitsCheckResult{false, []string{}}
}
func (m *ResourceManager) coresMemoryTotal(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodesFromNotAutoscaledGroups []*corev1.Node) (int64, int64, errors.AutoscalerError) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
return 0, 0, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get node group size of %v: ", nodeGroup.Id())
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
return 0, 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(nodeInfo.Node())
coresTotal = coresTotal + int64(currentSize)*nodeCPU
memoryTotal = memoryTotal + int64(currentSize)*nodeMemory
}
}
for _, node := range nodesFromNotAutoscaledGroups {
cores, memory := utils.GetNodeCoresAndMemory(node)
coresTotal += cores
memoryTotal += memory
}
return coresTotal, memoryTotal, nil
}
func (m *ResourceManager) customResourcesTotal(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodesFromNotAutoscaledGroups []*corev1.Node) (map[string]int64, errors.AutoscalerError) {
result := make(map[string]int64)
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
currentSize, err := nodeGroup.TargetSize()
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get node group size of %v: ", nodeGroup.Id())
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
resourceTargets, err := m.crp.GetNodeResourceTargets(ctx, nodeInfo.Node(), nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get target gpu for node group %v: ", nodeGroup.Id())
}
for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount * int64(currentSize)
}
}
}
for _, node := range nodesFromNotAutoscaledGroups {
resourceTargets, err := m.crp.GetNodeResourceTargets(ctx, node, nil)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get target gpu for node gpus count for node %v: ", node.Name)
}
for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount
}
}
return result, nil
}
func computeBelowMax(total int64, max int64) int64 {
if total < max {
return max - total
}
return 0
}

View File

@ -0,0 +1,290 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaleup
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
)
type nodeGroupConfig struct {
Name string
Min int
Max int
Size int
CPU int64
Mem int64
}
type deltaForNodeTestCase struct {
nodeGroupConfig nodeGroupConfig
expectedOutput ResourcesDelta
}
func TestDeltaForNode(t *testing.T) {
testCases := []deltaForNodeTestCase{
{
nodeGroupConfig: nodeGroupConfig{Name: "ng1", Min: 3, Max: 10, Size: 5, CPU: 8, Mem: 16},
expectedOutput: ResourcesDelta{"cpu": 8, "memory": 16},
},
{
nodeGroupConfig: nodeGroupConfig{Name: "ng2", Min: 1, Max: 20, Size: 9, CPU: 4, Mem: 32},
expectedOutput: ResourcesDelta{"cpu": 4, "memory": 32},
},
}
for _, testCase := range testCases {
cp := testprovider.NewTestCloudProvider(nil, nil)
ctx := newContext(t, cp)
processors := test.NewTestProcessors()
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
delta, err := rm.DeltaForNode(&ctx, nodeInfos[ng.Name], group)
assert.NoError(t, err)
assert.Equal(t, testCase.expectedOutput, delta)
}
}
type resourceLeftTestCase struct {
nodeGroupConfig nodeGroupConfig
clusterCPULimit int64
clusterMemLimit int64
expectedOutput ResourcesLimits
}
func TestResourcesLeft(t *testing.T) {
testCases := []resourceLeftTestCase{
{
// cpu left: 1000 - 8 * 5 = 960; memory left: 1000 - 16 * 5 = 920
nodeGroupConfig: nodeGroupConfig{Name: "ng1", Min: 3, Max: 10, Size: 5, CPU: 8, Mem: 16},
clusterCPULimit: 1000,
clusterMemLimit: 1000,
expectedOutput: ResourcesLimits{"cpu": 960, "memory": 920},
},
{
// cpu left: 1000 - 4 * 100 = 600; memory left: 1000 - 8 * 100 = 200
nodeGroupConfig: nodeGroupConfig{Name: "ng2", Min: 3, Max: 100, Size: 100, CPU: 4, Mem: 8},
clusterCPULimit: 1000,
clusterMemLimit: 1000,
expectedOutput: ResourcesLimits{"cpu": 600, "memory": 200},
},
}
for _, testCase := range testCases {
cp := newCloudProvider(t, 1000, 1000)
ctx := newContext(t, cp)
processors := test.NewTestProcessors()
ng := testCase.nodeGroupConfig
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
left, err := rm.ResourcesLeft(&ctx, nodeInfos, nodes)
assert.NoError(t, err)
assert.Equal(t, testCase.expectedOutput, left)
}
}
type applyResourcesLimitsTestCase struct {
nodeGroupConfig nodeGroupConfig
resourcesLeft ResourcesLimits
newNodeCount int
expectedOutput int
}
func TestApplyResourcesLimits(t *testing.T) {
testCases := []applyResourcesLimitsTestCase{
{
nodeGroupConfig: nodeGroupConfig{Name: "ng1", Min: 3, Max: 10, Size: 5, CPU: 8, Mem: 16},
resourcesLeft: ResourcesLimits{"cpu": 80, "memory": 160},
newNodeCount: 10,
expectedOutput: 10,
},
{
nodeGroupConfig: nodeGroupConfig{Name: "ng2", Min: 3, Max: 10, Size: 5, CPU: 8, Mem: 16},
resourcesLeft: ResourcesLimits{"cpu": 80, "memory": 100},
newNodeCount: 10,
expectedOutput: 6, // limited by memory: 100 / 16 = 6
},
{
nodeGroupConfig: nodeGroupConfig{Name: "ng3", Min: 3, Max: 10, Size: 5, CPU: 8, Mem: 16},
resourcesLeft: ResourcesLimits{"cpu": 39, "memory": 160},
newNodeCount: 10,
expectedOutput: 4, // limited by CPU: 39 / 8 = 4
},
{
nodeGroupConfig: nodeGroupConfig{Name: "ng4", Min: 3, Max: 10, Size: 5, CPU: 8, Mem: 16},
resourcesLeft: ResourcesLimits{"cpu": 40, "memory": 80},
newNodeCount: 10,
expectedOutput: 5, // limited by CPU and memory
},
}
for _, testCase := range testCases {
cp := testprovider.NewTestCloudProvider(nil, nil)
ctx := newContext(t, cp)
processors := test.NewTestProcessors()
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&ctx, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
newCount, err := rm.ApplyResourcesLimits(&ctx, testCase.newNodeCount, testCase.resourcesLeft, nodeInfos[testCase.nodeGroupConfig.Name], group)
assert.NoError(t, err)
assert.Equal(t, testCase.expectedOutput, newCount)
}
}
type checkDeltaWithinLimitsTestCase struct {
resourcesLeft ResourcesLimits
resourcesDelta ResourcesDelta
expectedOutput LimitsCheckResult
}
func TestCheckDeltaWithinLimits(t *testing.T) {
testCases := []checkDeltaWithinLimitsTestCase{
{
resourcesLeft: ResourcesLimits{"cpu": 10, "memory": 20},
resourcesDelta: ResourcesDelta{"cpu": 8, "memory": 16},
expectedOutput: LimitsCheckResult{Exceeded: false, ExceededResources: []string{}},
},
{
resourcesLeft: ResourcesLimits{"cpu": 10, "memory": 20},
resourcesDelta: ResourcesDelta{"cpu": 12, "memory": 16},
expectedOutput: LimitsCheckResult{Exceeded: true, ExceededResources: []string{"cpu"}},
},
{
resourcesLeft: ResourcesLimits{"cpu": 10, "memory": 20},
resourcesDelta: ResourcesDelta{"cpu": 8, "memory": 32},
expectedOutput: LimitsCheckResult{Exceeded: true, ExceededResources: []string{"memory"}},
},
{
resourcesLeft: ResourcesLimits{"cpu": 10, "memory": 20},
resourcesDelta: ResourcesDelta{"cpu": 16, "memory": 96},
expectedOutput: LimitsCheckResult{Exceeded: true, ExceededResources: []string{"cpu", "memory"}},
},
}
for _, testCase := range testCases {
result := CheckDeltaWithinLimits(testCase.resourcesLeft, testCase.resourcesDelta)
assert.Equal(t, testCase.expectedOutput, result)
}
}
func TestResourceManagerWithGpuResource(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0, "gpu": 0},
map[string]int64{cloudprovider.ResourceNameCores: 320, cloudprovider.ResourceNameMemory: 640, "gpu": 16},
)
provider.SetResourceLimiter(resourceLimiter)
context := newContext(t, provider)
processors := test.NewTestProcessors()
n1 := newNode(t, "n1", 8, 16)
utils_test.AddGpusToNode(n1, 4)
n1.Labels[provider.GPULabel()] = "gpu"
provider.AddNodeGroup("ng1", 3, 10, 1)
provider.AddNode("ng1", n1)
ng1, err := provider.NodeGroupForNode(n1)
assert.NoError(t, err)
nodes := []*corev1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
rm := NewResourceManager(processors.CustomResourcesProcessor)
delta, err := rm.DeltaForNode(&context, nodeInfos["ng1"], ng1)
assert.Equal(t, int64(8), delta[cloudprovider.ResourceNameCores])
assert.Equal(t, int64(16), delta[cloudprovider.ResourceNameMemory])
assert.Equal(t, int64(4), delta["gpu"])
left, err := rm.ResourcesLeft(&context, nodeInfos, nodes)
assert.NoError(t, err)
assert.Equal(t, ResourcesLimits{"cpu": 312, "memory": 624, "gpu": 12}, left) // cpu: 320-8*1=312; memory: 640-16*1=624; gpu: 16-4*1=12
result := CheckDeltaWithinLimits(left, delta)
assert.False(t, result.Exceeded)
assert.Zero(t, len(result.ExceededResources))
newNodeCount, err := rm.ApplyResourcesLimits(&context, 10, left, nodeInfos["ng1"], ng1)
assert.Equal(t, 3, newNodeCount) // gpu left / grpu per node: 12 / 4 = 3
}
func newCloudProvider(t *testing.T, cpu, mem int64) *testprovider.TestCloudProvider {
provider := testprovider.NewTestCloudProvider(nil, nil)
assert.NotNil(t, provider)
resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0},
map[string]int64{cloudprovider.ResourceNameCores: cpu, cloudprovider.ResourceNameMemory: mem},
)
provider.SetResourceLimiter(resourceLimiter)
return provider
}
func newContext(t *testing.T, provider cloudprovider.CloudProvider) context.AutoscalingContext {
podLister := kube_util.NewTestPodLister([]*corev1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
context, err := test.NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
return context
}
func newNode(t *testing.T, name string, cpu, mem int64) *corev1.Node {
return utils_test.BuildTestNode(name, cpu*1000, mem)
}
func newNodeGroup(t *testing.T, provider *testprovider.TestCloudProvider, name string, min, max, size int, cpu, mem int64) (cloudprovider.NodeGroup, []*corev1.Node) {
provider.AddNodeGroup(name, min, max, size)
nodes := make([]*corev1.Node, 0)
for index := 0; index < size; index++ {
node := newNode(t, fmt.Sprint(name, index), cpu, mem)
provider.AddNode(name, node)
nodes = append(nodes, node)
}
groups := provider.NodeGroups()
for _, group := range groups {
if group.Id() == name {
return group, nodes
}
}
assert.FailNowf(t, "node group %s not found", name)
return nil, nil
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander"
@ -81,6 +82,7 @@ type StaticAutoscaler struct {
lastScaleDownFailTime time.Time lastScaleDownFailTime time.Time
scaleDownPlanner scaledown.Planner scaleDownPlanner scaledown.Planner
scaleDownActuator scaledown.Actuator scaleDownActuator scaledown.Actuator
scaleUpResourceManager *scaleup.ResourceManager
processors *ca_processors.AutoscalingProcessors processors *ca_processors.AutoscalingProcessors
processorCallbacks *staticAutoscalerProcessorCallbacks processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool initialized bool
@ -177,6 +179,8 @@ func NewStaticAutoscaler(
scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator) scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator)
processorCallbacks.scaleDownPlanner = scaleDownWrapper processorCallbacks.scaleDownPlanner = scaleDownWrapper
scaleUpResourceManager := scaleup.NewResourceManager(processors.CustomResourcesProcessor)
// Set the initial scale times to be less than the start time so as to // Set the initial scale times to be less than the start time so as to
// not start in cooldown mode. // not start in cooldown mode.
initialScaleTime := time.Now().Add(-time.Hour) initialScaleTime := time.Now().Add(-time.Hour)
@ -187,6 +191,7 @@ func NewStaticAutoscaler(
lastScaleDownFailTime: initialScaleTime, lastScaleDownFailTime: initialScaleTime,
scaleDownPlanner: scaleDownWrapper, scaleDownPlanner: scaleDownWrapper,
scaleDownActuator: scaleDownWrapper, scaleDownActuator: scaleDownWrapper,
scaleUpResourceManager: scaleUpResourceManager,
processors: processors, processors: processors,
processorCallbacks: processorCallbacks, processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry, clusterStateRegistry: clusterStateRegistry,
@ -307,7 +312,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff) nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot // Initialize cluster state to ClusterSnapshot
if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil { if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil {
return typedErr.AddPrefix("Initialize ClusterSnapshot") return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ")
} }
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime) nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime)
@ -457,6 +462,33 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
preScaleUp := func() time.Time {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
return scaleUpStart
}
postScaleUp := func(scaleUpStart time.Time) (bool, errors.AutoscalerError) {
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)
if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(autoscalingContext, scaleUpStatus)
scaleUpStatusProcessorAlreadyCalled = true
}
if typedErr != nil {
klog.Errorf("Failed to scale up: %v", typedErr)
return true, typedErr
}
if scaleUpStatus.Result == status.ScaleUpSuccessful {
a.lastScaleUpTime = currentTime
// No scale down in this iteration.
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
return true, nil
}
return false, nil
}
if len(unschedulablePodsToHelp) == 0 { if len(unschedulablePodsToHelp) == 0 {
scaleUpStatus.Result = status.ScaleUpNotNeeded scaleUpStatus.Result = status.ScaleUpNotNeeded
klog.V(1).Info("No unschedulable pods") klog.V(1).Info("No unschedulable pods")
@ -472,27 +504,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStatus.Result = status.ScaleUpInCooldown scaleUpStatus.Result = status.ScaleUpInCooldown
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else { } else {
scaleUpStart := time.Now() scaleUpStart := preScaleUp()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, a.scaleUpResourceManager, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)
if exit, err := postScaleUp(scaleUpStart); exit {
scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints) return err
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)
if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(autoscalingContext, scaleUpStatus)
scaleUpStatusProcessorAlreadyCalled = true
}
if typedErr != nil {
klog.Errorf("Failed to scale up: %v", typedErr)
return typedErr
}
if scaleUpStatus.Result == status.ScaleUpSuccessful {
a.lastScaleUpTime = currentTime
// No scale down in this iteration.
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
return nil
} }
} }
@ -613,6 +628,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
} }
} }
} }
if a.EnforceNodeGroupMinSize {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = ScaleUpToNodeGroupMinSize(autoscalingContext, a.processors, a.clusterStateRegistry, a.scaleUpResourceManager, readyNodes, nodeInfosForGroups)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
}
return nil return nil
} }

View File

@ -159,6 +159,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
n2 := BuildTestNode("n2", 1000, 1000) n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now()) SetNodeReadyState(n2, true, time.Now())
n3 := BuildTestNode("n3", 1000, 1000) n3 := BuildTestNode("n3", 1000, 1000)
n4 := BuildTestNode("n4", 1000, 1000)
p1 := BuildTestPod("p1", 600, 100) p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = "n1" p1.Spec.NodeName = "n1"
@ -177,7 +178,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
return ret return ret
}, },
nil, nil, nil, nil,
nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni}) nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni, "ng3": tni})
provider.AddNodeGroup("ng1", 1, 10, 1) provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", n1) provider.AddNode("ng1", n1)
ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup)
@ -191,11 +192,12 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
ScaleDownUnreadyTime: time.Minute, ScaleDownUnreadyTime: time.Minute,
ScaleDownUtilizationThreshold: 0.5, ScaleDownUtilizationThreshold: 0.5,
}, },
EstimatorName: estimator.BinpackingEstimatorName, EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true, EnforceNodeGroupMinSize: true,
MaxNodesTotal: 1, ScaleDownEnabled: true,
MaxCoresTotal: 10, MaxNodesTotal: 1,
MaxMemoryTotal: 100000, MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
} }
processorCallbacks := newStaticAutoscalerProcessorCallbacks() processorCallbacks := newStaticAutoscalerProcessorCallbacks()
@ -316,6 +318,22 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
// Scale up to node gorup min size.
readyNodeLister.SetNodes([]*apiv1.Node{n4})
allNodeLister.SetNodes([]*apiv1.Node{n4})
scheduledPodMock.On("List").Return([]*apiv1.Pod{}, nil)
unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil)
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil)
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil)
onScaleUpMock.On("ScaleUp", "ng3", 2).Return(nil).Once() // 2 new nodes are supposed to be scaled up.
provider.AddNodeGroup("ng3", 3, 10, 1)
provider.AddNode("ng3", n4)
err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, onScaleUpMock)
} }
func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {

View File

@ -86,14 +86,15 @@ func multiStringFlag(name string, usage string) *MultiStringFlag {
} }
var ( var (
clusterName = flag.String("cluster-name", "", "Autoscaled cluster name, if available") clusterName = flag.String("cluster-name", "", "Autoscaled cluster name, if available")
address = flag.String("address", ":8085", "The address to expose prometheus metrics.") address = flag.String("address", ":8085", "The address to expose prometheus metrics.")
kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default")
kubeConfigFile = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") kubeConfigFile = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.")
cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
namespace = flag.String("namespace", "kube-system", "Namespace in which cluster-autoscaler run.") namespace = flag.String("namespace", "kube-system", "Namespace in which cluster-autoscaler run.")
scaleDownEnabled = flag.Bool("scale-down-enabled", true, "Should CA scale down the cluster") enforceNodeGroupMinSize = flag.Bool("enforce-node-group-min-size", false, "Should CA scale up the node group to the configured min size if needed.")
scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute, scaleDownEnabled = flag.Bool("scale-down-enabled", true, "Should CA scale down the cluster")
scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute,
"How long after scale up that scale down evaluation resumes") "How long after scale up that scale down evaluation resumes")
scaleDownDelayAfterDelete = flag.Duration("scale-down-delay-after-delete", 0, scaleDownDelayAfterDelete = flag.Duration("scale-down-delay-after-delete", 0,
"How long after node deletion that scale down evaluation resumes, defaults to scanInterval") "How long after node deletion that scale down evaluation resumes, defaults to scanInterval")
@ -261,6 +262,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MinMemoryTotal: minMemoryTotal, MinMemoryTotal: minMemoryTotal,
GpuTotal: parsedGpuTotal, GpuTotal: parsedGpuTotal,
NodeGroups: *nodeGroupsFlag, NodeGroups: *nodeGroupsFlag,
EnforceNodeGroupMinSize: *enforceNodeGroupMinSize,
ScaleDownDelayAfterAdd: *scaleDownDelayAfterAdd, ScaleDownDelayAfterAdd: *scaleDownDelayAfterAdd,
ScaleDownDelayAfterDelete: *scaleDownDelayAfterDelete, ScaleDownDelayAfterDelete: *scaleDownDelayAfterDelete,
ScaleDownDelayAfterFailure: *scaleDownDelayAfterFailure, ScaleDownDelayAfterFailure: *scaleDownDelayAfterFailure,