Introduce LocalSSDSizeProvider interface for GCE

This commit is contained in:
Mahmoud Atwa 2024-02-15 14:32:03 +00:00
parent 5286b3f770
commit e7ff1cd90f
10 changed files with 127 additions and 62 deletions

View File

@ -29,6 +29,7 @@ import (
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/externalgrpc/examples/external-grpc-cloud-provider-service/wrapper"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/externalgrpc/protos"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
kube_flag "k8s.io/component-base/cli/flag"
klog "k8s.io/klog/v2"
@ -121,7 +122,8 @@ func main() {
NodeGroups: *nodeGroupsFlag,
ClusterName: *clusterName,
GCEOptions: config.GCEOptions{
ConcurrentRefreshes: 1,
ConcurrentRefreshes: 1,
LocalSSDDiskSizeProvider: localssdsize.NewSimpleLocalSSDProvider(),
},
UserAgent: "user-agent",
}

View File

@ -378,12 +378,12 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
defer config.Close()
}
manager, err := CreateGceManager(config, do, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
if err != nil {
klog.Fatalf("Failed to create GCE Manager: %v", err)
}
pricingModel := NewGcePriceModel(NewGcePriceInfo())
pricingModel := NewGcePriceModel(NewGcePriceInfo(), opts.GCEOptions.LocalSSDDiskSizeProvider)
provider, err := BuildGceCloudProvider(manager, rl, pricingModel)
if err != nil {
klog.Fatalf("Failed to create GCE cloud provider: %v", err)

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/client-go/util/workqueue"
@ -112,19 +113,22 @@ type gceManagerImpl struct {
migInfoProvider MigInfoProvider
migLister MigLister
location string
projectId string
domainUrl string
templates *GceTemplateBuilder
interrupt chan struct{}
regional bool
explicitlyConfigured map[GceRef]bool
migAutoDiscoverySpecs []migAutoDiscoveryConfig
reserved *GceReserved
location string
projectId string
domainUrl string
templates *GceTemplateBuilder
interrupt chan struct{}
regional bool
explicitlyConfigured map[GceRef]bool
migAutoDiscoverySpecs []migAutoDiscoveryConfig
reserved *GceReserved
localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider
}
// CreateGceManager constructs GceManager object.
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider,
regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
// Create Google Compute Engine token.
var err error
tokenSource := google.ComputeTokenSource("")
@ -181,19 +185,20 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr
cache := NewGceCache()
migLister := NewMigLister(cache)
manager := &gceManagerImpl{
cache: cache,
GceService: gceService,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
location: location,
regional: regional,
projectId: projectId,
templates: &GceTemplateBuilder{},
interrupt: make(chan struct{}),
explicitlyConfigured: make(map[GceRef]bool),
concurrentGceRefreshes: concurrentGceRefreshes,
reserved: &GceReserved{},
domainUrl: domainUrl,
cache: cache,
GceService: gceService,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
location: location,
regional: regional,
projectId: projectId,
templates: &GceTemplateBuilder{},
interrupt: make(chan struct{}),
explicitlyConfigured: make(map[GceRef]bool),
concurrentGceRefreshes: concurrentGceRefreshes,
reserved: &GceReserved{},
domainUrl: domainUrl,
localSSDDiskSizeProvider: localSSDDiskSizeProvider,
}
if err := manager.fetchExplicitMigs(discoveryOpts.NodeGroupSpecs); err != nil {
@ -599,7 +604,7 @@ func (m *gceManagerImpl) GetMigTemplateNode(mig Mig) (*apiv1.Node, error) {
if err != nil {
return nil, err
}
return m.templates.BuildNodeFromTemplate(mig, migOsInfo, template, machineType.CPU, machineType.Memory, nil, m.reserved)
return m.templates.BuildNodeFromTemplate(mig, migOsInfo, template, machineType.CPU, machineType.Memory, nil, m.reserved, m.localSSDDiskSizeProvider)
}
// parseMIGAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs

View File

@ -22,6 +22,7 @@ import (
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
@ -30,13 +31,15 @@ import (
// GcePriceModel implements PriceModel interface for GCE.
type GcePriceModel struct {
PriceInfo PriceInfo
PriceInfo PriceInfo
localSSDSizeProvider localssdsize.LocalSSDSizeProvider
}
// NewGcePriceModel gets a new instance of GcePriceModel
func NewGcePriceModel(info PriceInfo) *GcePriceModel {
func NewGcePriceModel(info PriceInfo, localSSDSizeProvider localssdsize.LocalSSDSizeProvider) *GcePriceModel {
return &GcePriceModel{
PriceInfo: info,
PriceInfo: info,
localSSDSizeProvider: localSSDSizeProvider,
}
}
@ -55,27 +58,26 @@ const DefaultBootDiskSizeGB = 100
func (model *GcePriceModel) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
price := 0.0
basePriceFound := false
// Base instance price
machineType := ""
if node.Labels != nil {
if machineType, found := getInstanceTypeFromLabels(node.Labels); found {
priceMapToUse := model.PriceInfo.InstancePrices()
if hasPreemptiblePricing(node) {
priceMapToUse = model.PriceInfo.PreemptibleInstancePrices()
}
if basePricePerHour, found := priceMapToUse[machineType]; found {
price = basePricePerHour * getHours(startTime, endTime)
basePriceFound = true
} else {
klog.Warningf("Pricing information not found for instance type %v; will fallback to default pricing", machineType)
}
if _machineType, found := getInstanceTypeFromLabels(node.Labels); found {
machineType = _machineType
}
}
// Base instance price
priceMapToUse := model.PriceInfo.InstancePrices()
if hasPreemptiblePricing(node) {
priceMapToUse = model.PriceInfo.PreemptibleInstancePrices()
}
if basePricePerHour, found := priceMapToUse[machineType]; found {
price = basePricePerHour * getHours(startTime, endTime)
basePriceFound = true
} else {
klog.Warningf("Pricing information not found for instance type %v; will fallback to default pricing", machineType)
}
if !basePriceFound {
if machineType, found := getInstanceTypeFromLabels(node.Labels); found {
price = model.getBasePrice(node.Status.Capacity, machineType, startTime, endTime)
price = price * model.getPreemptibleDiscount(node)
}
price = model.getBasePrice(node.Status.Capacity, machineType, startTime, endTime)
price = price * model.getPreemptibleDiscount(node)
}
// Ephemeral Storage
@ -86,7 +88,7 @@ func (model *GcePriceModel) NodePrice(node *apiv1.Node, startTime time.Time, end
if hasPreemptiblePricing(node) {
localSsdPrice = model.PriceInfo.SpotLocalSsdPricePerHour()
}
price += localSsdCount * float64(LocalSSDDiskSizeInGiB) * localSsdPrice * getHours(startTime, endTime)
price += localSsdCount * float64(model.localSSDSizeProvider.SSDSizeInGiB(machineType)) * localSsdPrice * getHours(startTime, endTime)
}
// Boot disk price

View File

@ -24,6 +24,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
@ -63,7 +64,10 @@ func testNode(t *testing.T, nodeName string, instanceType string, millicpu int64
func testNodeEphemeralStorage(t *testing.T, nodeName string, isEphemeralStorageLocalSsd bool, localSsdCount int, bootDiskType string, bootDiskSize int, isSpot bool) *apiv1.Node {
node := testNode(t, nodeName, "", 8000, 30*units.GiB, "", 0, false, isSpot)
if isEphemeralStorageLocalSsd {
AddEphemeralStorageToNode(node, int64(localSsdCount)*LocalSSDDiskSizeInGiB)
simpleLocalSSDProvider := localssdsize.NewSimpleLocalSSDProvider()
machineType := ""
ssdSize := simpleLocalSSDProvider.SSDSizeInGiB(machineType)
AddEphemeralStorageToNode(node, int64(localSsdCount)*int64(ssdSize))
} else {
AddEphemeralStorageToNode(node, int64(bootDiskSize))
}
@ -218,7 +222,7 @@ func TestGetNodePrice(t *testing.T) {
for tn, tc := range cases {
t.Run(tn, func(t *testing.T) {
model := NewGcePriceModel(NewGcePriceInfo())
model := NewGcePriceModel(NewGcePriceInfo(), localssdsize.NewSimpleLocalSSDProvider())
now := time.Now()
price1, err := model.NodePrice(tc.cheaperNode, now, now.Add(time.Hour))
@ -237,7 +241,7 @@ func TestGetPodPrice(t *testing.T) {
pod2 := BuildTestPodWithEphemeralStorage("a2", 2*100, 2*500*units.MiB, 2*100*units.GiB)
pod3 := BuildTestPodWithEphemeralStorage("a2", 2*100, 2*500*units.MiB, 100*units.GiB)
model := NewGcePriceModel(NewGcePriceInfo())
model := NewGcePriceModel(NewGcePriceInfo(), localssdsize.NewSimpleLocalSSDProvider())
now := time.Now()
price1, err := model.PodPrice(pod1, now, now.Add(time.Hour))

View File

@ -0,0 +1,46 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package localssdsize
// LocalSSDSizeProvider contains methods to calculate local ssd disk size for GCE based on some input parameters (e.g. machine type name)
type LocalSSDSizeProvider interface {
// Computes local ssd disk size in GiB based on machine type name
SSDSizeInGiB(string) uint64
}
// LocalSSDDiskSizeInGiB is the size of each local SSD in GiB
// (cf. https://cloud.google.com/compute/docs/disks/local-ssd)
const LocalSSDDiskSizeInGiB = uint64(375)
// SimpleLocalSSDProvider implements LocalSSDSizeProvider
// It always returns a constant size
type SimpleLocalSSDProvider struct {
ssdDiskSize uint64
}
// NewSimpleLocalSSDProvider creates an instance of SimpleLocalSSDProvider with `LocalSSDDiskSizeInGiB` as the disk size and returns a pointer to it
func NewSimpleLocalSSDProvider() LocalSSDSizeProvider {
return &SimpleLocalSSDProvider{
ssdDiskSize: LocalSSDDiskSizeInGiB,
}
}
// SSDSizeInGiB Returns a constant disk size in GiB
// First parameter is not used and added to conform to the interface `LocalSSDSizeProvider`
func (lsp *SimpleLocalSSDProvider) SSDSizeInGiB(_ string) uint64 {
return lsp.ssdDiskSize
}

View File

@ -33,6 +33,7 @@ import (
"sigs.k8s.io/yaml"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
)
@ -40,10 +41,6 @@ import (
// GceTemplateBuilder builds templates for GCE nodes.
type GceTemplateBuilder struct{}
// LocalSSDDiskSizeInGiB is the size of each local SSD in GiB
// (cf. https://cloud.google.com/compute/docs/disks/local-ssd)
const LocalSSDDiskSizeInGiB = 375
// These annotations are used internally only to store information in node temlate and use it later in CA, the actuall nodes won't have these annotations.
const (
// LocalSsdCountAnnotation is the annotation for number of attached local SSDs to the node.
@ -186,7 +183,7 @@ func (t *GceTemplateBuilder) MigOsInfo(migId string, template *gce.InstanceTempl
}
// BuildNodeFromTemplate builds node from provided GCE template.
func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, migOsInfo MigOsInfo, template *gce.InstanceTemplate, cpu int64, mem int64, pods *int64, reserved OsReservedCalculator) (*apiv1.Node, error) {
func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, migOsInfo MigOsInfo, template *gce.InstanceTemplate, cpu int64, mem int64, pods *int64, reserved OsReservedCalculator, localSSDSizeProvider localssdsize.LocalSSDSizeProvider) (*apiv1.Node, error) {
if template.Properties == nil {
return nil, fmt.Errorf("instance template %s has no properties", template.Name)
@ -222,7 +219,8 @@ func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, migOsInfo MigOsInfo,
}
ephemeralStorageLocalSsdCount := ephemeralStorageLocalSSDCount(kubeEnvValue)
if err == nil && ephemeralStorageLocalSsdCount > 0 {
ephemeralStorage, err = getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount)
localSSDDiskSize := localSSDSizeProvider.SSDSizeInGiB(template.Properties.MachineType)
ephemeralStorage, err = getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount, int64(localSSDDiskSize))
}
if err != nil {
return nil, fmt.Errorf("could not fetch ephemeral storage from instance template: %v", err)
@ -324,11 +322,11 @@ func getLocalSsdCount(instanceProperties *gce.InstanceProperties) (int64, error)
return count, nil
}
func getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount int64) (int64, error) {
func getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount, localSSDDiskSizeInGiB int64) (int64, error) {
if localSsdCount < ephemeralStorageLocalSsdCount {
return 0, fmt.Errorf("actual local SSD count is lower than ephemeral_storage_local_ssd_count")
}
return ephemeralStorageLocalSsdCount * LocalSSDDiskSizeInGiB * units.GiB, nil
return ephemeralStorageLocalSsdCount * localSSDDiskSizeInGiB * units.GiB, nil
}
// isBootDiskEphemeralStorageWithInstanceTemplateDisabled will allow bypassing Disk Size of Boot Disk from being

View File

@ -23,6 +23,7 @@ import (
"strings"
"testing"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
gpuUtils "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
@ -267,7 +268,8 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
assert.Error(t, err)
return
}
node, err := tb.BuildNodeFromTemplate(mig, migOsInfo, template, tc.physicalCpu, tc.physicalMemory, tc.pods, &GceReserved{})
localSSDDiskSize := localssdsize.NewSimpleLocalSSDProvider()
node, err := tb.BuildNodeFromTemplate(mig, migOsInfo, template, tc.physicalCpu, tc.physicalMemory, tc.pods, &GceReserved{}, localSSDDiskSize)
if tc.expectedNodeTemplateErr {
assert.Error(t, err)
} else {
@ -293,7 +295,7 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
// specifying physicalEphemeralStorageGiB in the testCase struct
physicalEphemeralStorageGiB := tc.bootDiskSizeGiB
if tc.ephemeralStorageLocalSSDCount > 0 {
physicalEphemeralStorageGiB = tc.ephemeralStorageLocalSSDCount * LocalSSDDiskSizeInGiB
physicalEphemeralStorageGiB = tc.ephemeralStorageLocalSSDCount * int64(localSSDDiskSize.SSDSizeInGiB(template.Properties.MachineType))
} else if tc.isEphemeralStorageBlocked {
physicalEphemeralStorageGiB = 0
}
@ -1414,7 +1416,8 @@ func TestBuildNodeFromTemplateArch(t *testing.T) {
if gotErr != nil {
t.Fatalf("MigOsInfo unexpected error: %v", gotErr)
}
gotNode, gotErr := tb.BuildNodeFromTemplate(mig, migOsInfo, template, 16, 128, nil, &GceReserved{})
localSSDDiskSize := localssdsize.NewSimpleLocalSSDProvider()
gotNode, gotErr := tb.BuildNodeFromTemplate(mig, migOsInfo, template, 16, 128, nil, &GceReserved{}, localSSDDiskSize)
if gotErr != nil {
t.Fatalf("BuildNodeFromTemplate unexpected error: %v", gotErr)
}

View File

@ -19,6 +19,7 @@ package config
import (
"time"
gce_localssdsize "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
@ -63,6 +64,8 @@ type GCEOptions struct {
MigInstancesMinRefreshWaitTime time.Duration
// DomainUrl is the GCE url used to make calls to GCE API.
DomainUrl string
// LocalSSDDiskSizeProvider provides local ssd disk size based on machine type
LocalSSDDiskSizeProvider gce_localssdsize.LocalSSDSizeProvider
}
const (

View File

@ -44,6 +44,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core"
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
@ -399,6 +400,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
GCEOptions: config.GCEOptions{
ConcurrentRefreshes: *concurrentGceRefreshes,
MigInstancesMinRefreshWaitTime: *gceMigInstancesMinRefreshWaitTime,
LocalSSDDiskSizeProvider: localssdsize.NewSimpleLocalSSDProvider(),
},
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,