Move GPULabel and GPUTypes to cloud provider

This commit is contained in:
Jiaxin Shan 2019-03-08 17:03:18 -08:00
parent d46dded1c2
commit 90666881d3
22 changed files with 405 additions and 150 deletions

3
.gitignore vendored
View File

@ -13,6 +13,9 @@
.idea/
*.iml
# VSCode project files
**/.vscode
# Emacs save files
*~
\#*\#

View File

@ -20,6 +20,8 @@ import (
"fmt"
"strings"
"os"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -27,12 +29,22 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog"
"os"
)
const (
// ProviderName is the cloud provider name for alicloud
ProviderName = "alicloud"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "aliyun.accelerator/nvidia_name"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
type aliCloudProvider struct {
@ -90,6 +102,16 @@ func (ali *aliCloudProvider) Name() string {
return ProviderName
}
// GPULabel returns the label added to nodes with GPU resource.
func (ali *aliCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (ali *aliCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
func (ali *aliCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0, len(ali.asgs))
for _, asg := range ali.asgs {

View File

@ -35,6 +35,17 @@ import (
const (
// ProviderName is the cloud provider name for AWS
ProviderName = "aws"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "k8s.amazonaws.com/accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// awsCloudProvider implements CloudProvider interface.
@ -63,6 +74,16 @@ func (aws *awsCloudProvider) Name() string {
return ProviderName
}
// GPULabel returns the label added to nodes with GPU resource.
func (aws *awsCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (aws *awsCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroups returns all node groups configured for this cloud provider.
func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
asgs := aws.awsManager.getAsgs()

View File

@ -32,6 +32,17 @@ import (
const (
// ProviderName is the cloud provider name for Azure
ProviderName = "azure"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "cloud.google.com/gke-accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// AzureCloudProvider provides implementation of CloudProvider interface for Azure.
@ -61,6 +72,16 @@ func (azure *AzureCloudProvider) Name() string {
return "azure"
}
// GPULabel returns the label added to nodes with GPU resource.
func (azure *AzureCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (azure *AzureCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroups returns all node groups configured for this cloud provider.
func (azure *AzureCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
asgs := azure.azureManager.getAsgs()

View File

@ -35,6 +35,17 @@ import (
const (
// ProviderName is the cloud provider name for baiducloud
ProviderName = "baiducloud"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "cloud.google.com/gke-accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// baiducloudCloudProvider implements CloudProvider interface.
@ -148,6 +159,16 @@ func (baiducloud *baiducloudCloudProvider) NodeGroups() []cloudprovider.NodeGrou
return result
}
// GPULabel returns the label added to nodes with GPU resource.
func (baiducloud *baiducloudCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (baiducloud *baiducloudCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroupForNode returns the node group for the given node, nil if the node
// should not be processed by cluster autoscaler, or non-nil error if such
// occurred. Must be implemented.

View File

@ -56,6 +56,12 @@ type CloudProvider interface {
// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
GetResourceLimiter() (*ResourceLimiter, error)
// GPULabel returns the label added to nodes with GPU resource.
GPULabel() string
// GetAvailableGPUTypes return all available GPU types cloud provider supports.
GetAvailableGPUTypes() map[string]struct{}
// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
Cleanup() error

View File

@ -34,6 +34,17 @@ import (
const (
// ProviderNameGCE is the name of GCE cloud provider.
ProviderNameGCE = "gce"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "cloud.google.com/gke-accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// GceCloudProvider implements CloudProvider interface.
@ -59,6 +70,16 @@ func (gce *GceCloudProvider) Name() string {
return ProviderNameGCE
}
// GPULabel returns the label added to nodes with GPU resource.
func (gce *GceCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (gce *GceCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroups returns all node groups configured for this cloud provider.
func (gce *GceCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
migs := gce.gceManager.GetMigs()

View File

@ -43,6 +43,17 @@ import (
const (
// ProviderName is the cloud provider name for kubemark
ProviderName = "kubemark"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "cloud.google.com/gke-accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// KubemarkCloudProvider implements CloudProvider interface for kubemark
@ -83,6 +94,16 @@ func (kubemark *KubemarkCloudProvider) Name() string {
return ProviderName
}
// GPULabel returns the label added to nodes with GPU resource.
func (kubemark *KubemarkCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (kubemark *KubemarkCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroups returns all node groups configured for this cloud provider.
func (kubemark *KubemarkCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0, len(kubemark.nodeGroups))

View File

@ -32,6 +32,17 @@ import (
const (
// ProviderName is the cloud provider name for kubemark
ProviderName = "kubemark"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "cloud.google.com/gke-accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// KubemarkCloudProvider implements CloudProvider interface.
@ -46,6 +57,16 @@ func BuildKubemarkCloudProvider(kubemarkController interface{}, specs []string,
// Name returns name of the cloud provider.
func (kubemark *KubemarkCloudProvider) Name() string { return "" }
// GPULabel returns the label added to nodes with GPU resource.
func (kubemark *KubemarkCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (kubemark *KubemarkCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroups returns all node groups configured for this cloud provider.
func (kubemark *KubemarkCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
return []cloudprovider.NodeGroup{}

View File

@ -33,6 +33,16 @@ import (
const (
// ProviderName is the cloud provider name for Magnum
ProviderName = "magnum"
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "cloud.google.com/gke-accelerator"
)
var (
availableGPUTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// magnumCloudProvider implements CloudProvider interface from cluster-autoscaler/cloudprovider module.
@ -56,6 +66,16 @@ func (os *magnumCloudProvider) Name() string {
return ProviderName
}
// GPULabel returns the label added to nodes with GPU resource.
func (os *magnumCloudProvider) GPULabel() string {
return GPULabel
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (os *magnumCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return availableGPUTypes
}
// NodeGroups returns all node groups managed by this cloud provider.
func (os *magnumCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
groups := make([]cloudprovider.NodeGroup, len(os.nodeGroups))

View File

@ -41,6 +41,36 @@ func (_m *CloudProvider) Cleanup() error {
return r0
}
// GPULabel provides a mock function with given fields:
func (_m *CloudProvider) GPULabel() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// GetAvailableGPUTypes provides a mock function with given fields:
func (_m *CloudProvider) GetAvailableGPUTypes() map[string]struct{} {
ret := _m.Called()
var r0 map[string]struct{}
if rf, ok := ret.Get(0).(func() map[string]struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]struct{})
}
}
return r0
}
// GetAvailableMachineTypes provides a mock function with given fields:
func (_m *CloudProvider) GetAvailableMachineTypes() ([]string, error) {
ret := _m.Called()

View File

@ -51,6 +51,7 @@ type TestCloudProvider struct {
onNodeGroupDelete func(string) error
machineTypes []string
machineTemplates map[string]*schedulernodeinfo.NodeInfo
priceModel cloudprovider.PricingModel
resourceLimiter *cloudprovider.ResourceLimiter
}
@ -87,6 +88,20 @@ func (tcp *TestCloudProvider) Name() string {
return "TestCloudProvider"
}
// GPULabel returns the label added to nodes with GPU resource.
func (tcp *TestCloudProvider) GPULabel() string {
return "TestGPULabel/accelerator"
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports
func (tcp *TestCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
}
// NodeGroups returns all node groups configured for this cloud provider.
func (tcp *TestCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
tcp.Lock()
@ -126,7 +141,16 @@ func (tcp *TestCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.
// Pricing returns pricing model for this cloud provider or error if not available.
func (tcp *TestCloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) {
return nil, cloudprovider.ErrNotImplemented
if tcp.priceModel == nil {
return nil, cloudprovider.ErrNotImplemented
}
return tcp.priceModel, nil
}
// SetPricingModel set given priceModel to test cloud provider
func (tcp *TestCloudProvider) SetPricingModel(priceModel cloudprovider.PricingModel) {
tcp.priceModel = priceModel
}
// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider.

View File

@ -67,7 +67,8 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
opts.AutoscalingOptions,
opts.PredicateChecker,
opts.AutoscalingKubeClients,
opts.Processors, opts.CloudProvider,
opts.Processors,
opts.CloudProvider,
opts.ExpanderStrategy,
opts.EstimatorBuilder,
opts.Backoff), nil

View File

@ -210,7 +210,7 @@ func calculateScaleDownGpusTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProv
}
}
if !cacheHit {
gpuType, gpuCount, err = gpu.GetNodeTargetGpus(node, nodeGroup)
gpuType, gpuCount, err = gpu.GetNodeTargetGpus(cp.GPULabel(), node, nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("can not get gpu count for node %v when calculating cluster gpu usage")
}
@ -244,7 +244,7 @@ func copyScaleDownResourcesLimits(source scaleDownResourcesLimits) scaleDownReso
return copy
}
func computeScaleDownResourcesDelta(node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, resourcesWithLimits []string) (scaleDownResourcesDelta, errors.AutoscalerError) {
func computeScaleDownResourcesDelta(cp cloudprovider.CloudProvider, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, resourcesWithLimits []string) (scaleDownResourcesDelta, errors.AutoscalerError) {
resultScaleDownDelta := make(scaleDownResourcesDelta)
nodeCPU, nodeMemory := getNodeCoresAndMemory(node)
@ -252,7 +252,7 @@ func computeScaleDownResourcesDelta(node *apiv1.Node, nodeGroup cloudprovider.No
resultScaleDownDelta[cloudprovider.ResourceNameMemory] = nodeMemory
if cloudprovider.ContainsGpuResources(resourcesWithLimits) {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(node, nodeGroup)
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(cp.GPULabel(), node, nodeGroup)
if err != nil {
return scaleDownResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v gpu: %v", node.Name)
}
@ -630,6 +630,8 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
candidates := make([]*apiv1.Node, 0)
readinessMap := make(map[string]bool)
candidateNodeGroups := make(map[string]cloudprovider.NodeGroup)
gpuLabel := sd.context.CloudProvider.GPULabel()
availableGPUTypes := sd.context.CloudProvider.GetAvailableGPUTypes()
resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter()
if errCP != nil {
@ -686,7 +688,7 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
continue
}
scaleDownResourcesDelta, err := computeScaleDownResourcesDelta(node, nodeGroup, resourcesWithLimits)
scaleDownResourcesDelta, err := computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
continue
@ -715,7 +717,7 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
if len(emptyNodes) > 0 {
nodeDeletionStart := time.Now()
confirmation := make(chan errors.AutoscalerError, len(emptyNodes))
sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups, confirmation)
sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.CloudProvider, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups, confirmation)
err := sd.waitForEmptyNodesDeleted(emptyNodes, confirmation)
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
if err == nil {
@ -776,9 +778,9 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
}
nodeGroup := candidateNodeGroups[toRemove.Node.Name]
if readinessMap[toRemove.Node.Name] {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(toRemove.Node, nodeGroup), metrics.Underutilized)
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Underutilized)
} else {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(toRemove.Node, nodeGroup), metrics.Unready)
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Unready)
}
}()
@ -838,7 +840,7 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
availabilityMap[nodeGroup.Id()] = available
}
if available > 0 {
resourcesDelta, err := computeScaleDownResourcesDelta(node, nodeGroup, resourcesNames)
resourcesDelta, err := computeScaleDownResourcesDelta(cloudProvider, node, nodeGroup, resourcesNames)
if err != nil {
klog.Errorf("Error: %v", err)
continue
@ -859,7 +861,7 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit]
}
func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client kube_client.Interface,
func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, cp cloudprovider.CloudProvider, client kube_client.Interface,
recorder kube_record.EventRecorder, readinessMap map[string]bool,
candidateNodeGroups map[string]cloudprovider.NodeGroup, confirmation chan errors.AutoscalerError) {
for _, node := range emptyNodes {
@ -890,9 +892,9 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
if deleteErr == nil {
nodeGroup := candidateNodeGroups[nodeToDelete.Name]
if readinessMap[nodeToDelete.Name] {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(nodeToDelete, nodeGroup), metrics.Empty)
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(cp.GPULabel(), cp.GetAvailableGPUTypes(), nodeToDelete, nodeGroup), metrics.Empty)
} else {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(nodeToDelete, nodeGroup), metrics.Unready)
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(cp.GPULabel(), cp.GetAvailableGPUTypes(), nodeToDelete, nodeGroup), metrics.Unready)
}
}
confirmation <- deleteErr

View File

@ -927,10 +927,17 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
nodes := make([]*apiv1.Node, len(config.nodes))
nodesMap := make(map[string]*apiv1.Node)
groups := make(map[string][]*apiv1.Node)
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
deletedNodes <- node
return nil
})
for i, n := range config.nodes {
node := BuildTestNode(n.name, n.cpu, n.memory)
if n.gpu > 0 {
AddGpusToNode(node, n.gpu)
node.Labels[provider.GPULabel()] = gpu.DefaultGPUType
}
SetNodeReadyState(node, n.ready, time.Time{})
nodesMap[n.name] = node
@ -959,11 +966,6 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
return true, obj, nil
})
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
deletedNodes <- node
return nil
})
for name, nodesInGroup := range groups {
provider.AddNodeGroup(name, 1, 10, len(nodesInGroup))
for _, n := range nodesInGroup {

View File

@ -48,6 +48,7 @@ type scaleUpResourcesDelta map[string]int64
const scaleUpLimitUnknown = math.MaxInt64
func computeScaleUpResourcesLeftLimits(
cp cloudprovider.CloudProvider,
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulernodeinfo.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node,
@ -57,7 +58,7 @@ func computeScaleUpResourcesLeftLimits(
var totalGpus map[string]int64
var totalGpusErr error
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
totalGpus, totalGpusErr = calculateScaleUpGpusTotal(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
totalGpus, totalGpusErr = calculateScaleUpGpusTotal(cp.GPULabel(), nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
}
resultScaleUpLimits := make(scaleUpResourcesLimits)
@ -134,6 +135,7 @@ func calculateScaleUpCoresMemoryTotal(
}
func calculateScaleUpGpusTotal(
GPULabel string,
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulernodeinfo.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node) (map[string]int64, errors.AutoscalerError) {
@ -149,7 +151,7 @@ func calculateScaleUpGpusTotal(
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(nodeInfo.Node(), nodeGroup)
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(GPULabel, nodeInfo.Node(), nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
}
@ -161,7 +163,7 @@ func calculateScaleUpGpusTotal(
}
for _, node := range nodesFromNotAutoscaledGroups {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(node, nil)
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(GPULabel, node, nil)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node gpus count for node %v:", node.Name)
}
@ -178,7 +180,7 @@ func computeBelowMax(total int64, max int64) int64 {
return 0
}
func computeScaleUpResourcesDelta(nodeInfo *schedulernodeinfo.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) {
func computeScaleUpResourcesDelta(cp cloudprovider.CloudProvider, nodeInfo *schedulernodeinfo.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(scaleUpResourcesDelta)
nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
@ -186,7 +188,7 @@ func computeScaleUpResourcesDelta(nodeInfo *schedulernodeinfo.NodeInfo, nodeGrou
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(nodeInfo.Node(), nodeGroup)
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(cp.GPULabel(), nodeInfo.Node(), nodeGroup)
if err != nil {
return scaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
}
@ -270,6 +272,8 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}
nodeGroups := context.CloudProvider.NodeGroups()
gpuLabel := context.CloudProvider.GPULabel()
availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes()
resourceLimiter, errCP := context.CloudProvider.GetResourceLimiter()
if errCP != nil {
@ -278,7 +282,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
errCP)
}
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context.CloudProvider, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
if errLimits != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, errLimits.AddPrefix("Could not compute total resources: ")
}
@ -346,7 +350,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
continue
}
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context.CloudProvider, nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
@ -487,7 +491,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}
// apply upper limits for CPU and memory
newNodes, err = applyScaleUpResourcesLimits(newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
newNodes, err = applyScaleUpResourcesLimits(context.CloudProvider, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
if err != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, err
}
@ -527,7 +531,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos {
typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(nodeInfo.Node(), nil), now)
typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)
if typedErr != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr
}
@ -703,13 +707,14 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
}
func applyScaleUpResourcesLimits(
cp cloudprovider.CloudProvider,
newNodes int,
scaleUpResourcesLeft scaleUpResourcesLimits,
nodeInfo *schedulernodeinfo.NodeInfo,
nodeGroup cloudprovider.NodeGroup,
resourceLimiter *cloudprovider.ResourceLimiter) (int, errors.AutoscalerError) {
delta, err := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
delta, err := computeScaleUpResourcesDelta(cp, nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
return 0, err
}

View File

@ -138,7 +138,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
klog.V(4).Info("Starting main loop")
stateUpdateStart := time.Now()
allNodes, readyNodes, typedErr := a.obtainNodeLists()
allNodes, readyNodes, typedErr := a.obtainNodeLists(a.CloudProvider)
if typedErr != nil {
return typedErr
}
@ -508,7 +508,7 @@ func (a *StaticAutoscaler) ExitCleanUp() {
utils.DeleteStatusConfigMap(a.AutoscalingContext.ClientSet, a.AutoscalingContext.ConfigNamespace)
}
func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, errors.AutoscalerError) {
func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*apiv1.Node, []*apiv1.Node, errors.AutoscalerError) {
allNodes, err := a.AllNodeLister().List()
if err != nil {
klog.Errorf("Failed to list all nodes: %v", err)
@ -525,7 +525,7 @@ func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, erro
// Treat those nodes as unready until GPU actually becomes available and let
// our normal handling for booting up nodes deal with this.
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = gpu.FilterOutNodesWithUnreadyGpus(allNodes, readyNodes)
allNodes, readyNodes = gpu.FilterOutNodesWithUnreadyGpus(cp.GPULabel(), allNodes, readyNodes)
return allNodes, readyNodes, nil
}

View File

@ -41,11 +41,10 @@ func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider
case expander.LeastWasteExpanderName:
return waste.NewStrategy(), nil
case expander.PriceBasedExpanderName:
pricing, err := cloudProvider.Pricing()
if err != nil {
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
}
return price.NewStrategy(pricing,
return price.NewStrategy(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness), nil
case expander.PriorityBasedExpanderName:

View File

@ -40,7 +40,7 @@ import (
// **********
type priceBased struct {
pricingModel cloudprovider.PricingModel
cloudProvider cloudprovider.CloudProvider
preferredNodeProvider PreferredNodeProvider
nodeUnfitness NodeUnfitness
}
@ -75,12 +75,12 @@ var (
)
// NewStrategy returns an expansion strategy that picks nodes based on price and preferred node type.
func NewStrategy(pricingModel cloudprovider.PricingModel,
func NewStrategy(cloudProvider cloudprovider.CloudProvider,
preferredNodeProvider PreferredNodeProvider,
nodeUnfitness NodeUnfitness,
) expander.Strategy {
return &priceBased{
pricingModel: pricingModel,
cloudProvider: cloudProvider,
preferredNodeProvider: preferredNodeProvider,
nodeUnfitness: nodeUnfitness,
}
@ -98,7 +98,13 @@ func (p *priceBased) BestOption(expansionOptions []expander.Option, nodeInfos ma
klog.Errorf("Failed to get preferred node, switching to default: %v", err)
preferredNode = defaultPreferredNode
}
stabilizationPrice, err := p.pricingModel.PodPrice(priceStabilizationPod, now, then)
pricingModel, err := p.cloudProvider.Pricing()
if err != nil {
klog.Errorf("Failed to get pricing model from cloud provider: %v", err)
}
stabilizationPrice, err := pricingModel.PodPrice(priceStabilizationPod, now, then)
if err != nil {
klog.Errorf("Failed to get price for stabilization pod: %v", err)
// continuing without stabilization.
@ -111,7 +117,7 @@ nextoption:
klog.Warningf("No node info for %s", option.NodeGroup.Id())
continue
}
nodePrice, err := p.pricingModel.NodePrice(nodeInfo.Node(), now, then)
nodePrice, err := pricingModel.NodePrice(nodeInfo.Node(), now, then)
if err != nil {
klog.Warningf("Failed to calculate node price for %s: %v", option.NodeGroup.Id(), err)
continue
@ -119,7 +125,7 @@ nextoption:
totalNodePrice := nodePrice * float64(option.NodeCount)
totalPodPrice := 0.0
for _, pod := range option.Pods {
podPrice, err := p.pricingModel.PodPrice(pod, now, then)
podPrice, err := pricingModel.PodPrice(pod, now, then)
if err != nil {
klog.Warningf("Failed to calculate pod price for %s/%s: %v", pod.Namespace, pod.Name, err)
continue nextoption
@ -141,7 +147,7 @@ nextoption:
// Set constant, very high unfitness to make them unattractive for pods that doesn't need GPU and
// avoid optimizing them for CPU utilization.
if gpu.NodeHasGpu(nodeInfo.Node()) {
if gpu.NodeHasGpu(p.cloudProvider.GPULabel(), nodeInfo.Node()) {
klog.V(4).Infof("Price expander overriding unfitness for node group with GPU %s", option.NodeGroup.Id())
supressedUnfitness = gpuUnfitnessOverride
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
apiv1 "k8s.io/api/core/v1"
cloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -85,6 +86,7 @@ func TestPriceExpander(t *testing.T) {
nodeInfosForGroups := map[string]*schedulernodeinfo.NodeInfo{
"ng1": ni1, "ng2": ni2,
}
var pricingModel cloudprovider.PricingModel
// All node groups accept the same set of pods.
options := []expander.Option{
@ -103,18 +105,20 @@ func TestPriceExpander(t *testing.T) {
}
// First node group is cheaper.
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 20.0,
"n2": 200.0,
},
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 20.0,
"n2": 200.0,
},
}
provider.SetPricingModel(pricingModel)
assert.Contains(t, NewStrategy(
provider,
&testPreferredNodeProvider{
preferred: buildNode(2000, units.GiB),
},
@ -122,18 +126,20 @@ func TestPriceExpander(t *testing.T) {
).BestOption(options, nodeInfosForGroups).Debug, "ng1")
// First node group is cheaper, however, the second one is preferred.
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 50.0,
"n2": 200.0,
},
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 50.0,
"n2": 200.0,
},
}
provider.SetPricingModel(pricingModel)
assert.Contains(t, NewStrategy(
provider,
&testPreferredNodeProvider{
preferred: buildNode(4000, units.GiB),
},
@ -157,18 +163,21 @@ func TestPriceExpander(t *testing.T) {
}
// First node group is cheaper, the second is preferred
// but there is lots of nodes to be created.
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 20.0,
"n2": 200.0,
},
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 20.0,
"n2": 200.0,
},
}
provider.SetPricingModel(pricingModel)
assert.Contains(t, NewStrategy(
provider,
&testPreferredNodeProvider{
preferred: buildNode(4000, units.GiB),
},
@ -176,18 +185,20 @@ func TestPriceExpander(t *testing.T) {
).BestOption(options1b, nodeInfosForGroups).Debug, "ng1")
// Second node group is cheaper
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 100.0,
},
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 100.0,
},
}
provider.SetPricingModel(pricingModel)
assert.Contains(t, NewStrategy(
provider,
&testPreferredNodeProvider{
preferred: buildNode(2000, units.GiB),
},
@ -209,21 +220,22 @@ func TestPriceExpander(t *testing.T) {
Debug: "ng2",
},
}
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 200.0,
},
}
provider.SetPricingModel(pricingModel)
// Both node groups are equally expensive. However 2
// accept two pods.
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 200.0,
},
},
provider,
&testPreferredNodeProvider{
preferred: buildNode(2000, units.GiB),
},
@ -231,11 +243,13 @@ func TestPriceExpander(t *testing.T) {
).BestOption(options2, nodeInfosForGroups).Debug, "ng2")
// Errors are expected
pricingModel = &testPricingModel{
podPrice: map[string]float64{},
nodePrice: map[string]float64{},
}
provider.SetPricingModel(pricingModel)
assert.Nil(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{},
nodePrice: map[string]float64{},
},
provider,
&testPreferredNodeProvider{
preferred: buildNode(2000, units.GiB),
},
@ -265,21 +279,22 @@ func TestPriceExpander(t *testing.T) {
Debug: "ng3",
},
}
// Choose existing group when non-existing has the same price.
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 200.0,
"n3": 200.0,
},
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 200.0,
"n3": 200.0,
},
}
provider.SetPricingModel(pricingModel)
assert.Contains(t, NewStrategy(
provider,
&testPreferredNodeProvider{
preferred: buildNode(2000, units.GiB),
},
@ -287,19 +302,21 @@ func TestPriceExpander(t *testing.T) {
).BestOption(options3, nodeInfosForGroups).Debug, "ng2")
// Choose non-existing group when non-existing is cheaper.
assert.Contains(t, NewStrategy(
&testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 200.0,
"n3": 90.0,
},
pricingModel = &testPricingModel{
podPrice: map[string]float64{
"p1": 20.0,
"p2": 10.0,
"stabilize": 10,
},
nodePrice: map[string]float64{
"n1": 200.0,
"n2": 200.0,
"n3": 90.0,
},
}
provider.SetPricingModel(pricingModel)
assert.Contains(t, NewStrategy(
provider,
&testPreferredNodeProvider{
preferred: buildNode(2000, units.GiB),
},

View File

@ -27,8 +27,6 @@ import (
const (
// ResourceNvidiaGPU is the name of the Nvidia GPU resource.
ResourceNvidiaGPU = "nvidia.com/gpu"
// GPULabel is the label added to nodes with GPU resource on GKE.
GPULabel = "cloud.google.com/gke-accelerator"
// DefaultGPUType is the type of GPU used in NAP if the user
// don't specify what type of GPU his pod wants.
DefaultGPUType = "nvidia-tesla-k80"
@ -49,21 +47,11 @@ const (
MetricsNoGPU = ""
)
var (
// knownGpuTypes lists all known GPU types, to be used in metrics; map for convenient access
// TODO(kgolab) obtain this from Cloud Provider
knownGpuTypes = map[string]struct{}{
"nvidia-tesla-k80": {},
"nvidia-tesla-p100": {},
"nvidia-tesla-v100": {},
}
)
// FilterOutNodesWithUnreadyGpus removes nodes that should have GPU, but don't have it in allocatable
// from ready nodes list and updates their status to unready on all nodes list.
// This is a hack/workaround for nodes with GPU coming up without installed drivers, resulting
// in GPU missing from their allocatable and capacity.
func FilterOutNodesWithUnreadyGpus(allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
func FilterOutNodesWithUnreadyGpus(GPULabel string, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
newAllNodes := make([]*apiv1.Node, 0)
newReadyNodes := make([]*apiv1.Node, 0)
nodesWithUnreadyGpu := make(map[string]*apiv1.Node)
@ -95,7 +83,7 @@ func FilterOutNodesWithUnreadyGpus(allNodes, readyNodes []*apiv1.Node) ([]*apiv1
// GetGpuTypeForMetrics returns name of the GPU used on the node or empty string if there's no GPU
// if the GPU type is unknown, "generic" is returned
// NOTE: current implementation is GKE/GCE-specific
func GetGpuTypeForMetrics(node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) string {
func GetGpuTypeForMetrics(GPULabel string, availableGPUTypes map[string]struct{}, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) string {
// we use the GKE label if there is one
gpuType, labelFound := node.Labels[GPULabel]
capacity, capacityFound := node.Status.Capacity[ResourceNvidiaGPU]
@ -112,7 +100,7 @@ func GetGpuTypeForMetrics(node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) s
// GKE-specific label & capacity are present - consistent state
if capacityFound {
return validateGpuType(gpuType)
return validateGpuType(availableGPUTypes, gpuType)
}
// GKE-specific label present but no capacity (yet?) - check the node template
@ -135,8 +123,8 @@ func GetGpuTypeForMetrics(node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) s
return MetricsUnexpectedLabelGPU
}
func validateGpuType(gpu string) string {
if _, found := knownGpuTypes[gpu]; found {
func validateGpuType(availableGPUTypes map[string]struct{}, gpu string) string {
if _, found := availableGPUTypes[gpu]; found {
return gpu
}
return MetricsUnknownGPU
@ -162,7 +150,7 @@ func getUnreadyNodeCopy(node *apiv1.Node) *apiv1.Node {
// NodeHasGpu returns true if a given node has GPU hardware.
// The result will be true if there is hardware capability. It doesn't matter
// if the drivers are installed and GPU is ready to use.
func NodeHasGpu(node *apiv1.Node) bool {
func NodeHasGpu(GPULabel string, node *apiv1.Node) bool {
_, hasGpuLabel := node.Labels[GPULabel]
gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable[ResourceNvidiaGPU]
return hasGpuLabel || (hasGpuAllocatable && !gpuAllocatable.IsZero())
@ -183,7 +171,7 @@ func PodRequestsGpu(pod *apiv1.Pod) bool {
// GetNodeTargetGpus returns the number of gpus on a given node. This includes gpus which are not yet
// ready to use and visible in kubernetes.
func GetNodeTargetGpus(node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) (gpuType string, gpuCount int64, error errors.AutoscalerError) {
func GetNodeTargetGpus(GPULabel string, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) (gpuType string, gpuCount int64, error errors.AutoscalerError) {
gpuLabel, found := node.Labels[GPULabel]
if !found {
return "", 0, nil

View File

@ -29,6 +29,10 @@ import (
"github.com/stretchr/testify/assert"
)
const (
GPULabel = "TestGPULabel/accelerator"
)
func TestFilterOutNodesWithUnreadyGpus(t *testing.T) {
start := time.Now()
later := start.Add(10 * time.Minute)
@ -129,7 +133,7 @@ func TestFilterOutNodesWithUnreadyGpus(t *testing.T) {
nodeNoGpuUnready,
}
newAllNodes, newReadyNodes := FilterOutNodesWithUnreadyGpus(initialAllNodes, initialReadyNodes)
newAllNodes, newReadyNodes := FilterOutNodesWithUnreadyGpus(GPULabel, initialAllNodes, initialReadyNodes)
foundInReady := make(map[string]bool)
for _, node := range newReadyNodes {
@ -167,7 +171,7 @@ func TestNodeHasGpu(t *testing.T) {
}
nodeGpuReady.Status.Allocatable[ResourceNvidiaGPU] = *resource.NewQuantity(1, resource.DecimalSI)
nodeGpuReady.Status.Capacity[ResourceNvidiaGPU] = *resource.NewQuantity(1, resource.DecimalSI)
assert.True(t, NodeHasGpu(nodeGpuReady))
assert.True(t, NodeHasGpu(GPULabel, nodeGpuReady))
nodeGpuUnready := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -179,7 +183,7 @@ func TestNodeHasGpu(t *testing.T) {
Allocatable: apiv1.ResourceList{},
},
}
assert.True(t, NodeHasGpu(nodeGpuUnready))
assert.True(t, NodeHasGpu(GPULabel, nodeGpuUnready))
nodeNoGpu := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -191,7 +195,7 @@ func TestNodeHasGpu(t *testing.T) {
Allocatable: apiv1.ResourceList{},
},
}
assert.False(t, NodeHasGpu(nodeNoGpu))
assert.False(t, NodeHasGpu(GPULabel, nodeNoGpu))
}
func TestPodRequestsGpu(t *testing.T) {