Limit refresh rate of GCE MIG instances.

This commit is contained in:
Aleksandra Gacek 2023-04-05 17:23:26 +02:00
parent ad86bdd641
commit 656f1919a8
8 changed files with 245 additions and 108 deletions

View File

@ -120,7 +120,9 @@ func main() {
NodeGroupAutoDiscovery: *nodeGroupAutoDiscoveryFlag, NodeGroupAutoDiscovery: *nodeGroupAutoDiscoveryFlag,
NodeGroups: *nodeGroupsFlag, NodeGroups: *nodeGroupsFlag,
ClusterName: *clusterName, ClusterName: *clusterName,
ConcurrentGceRefreshes: 1, GCEOptions: config.GCEOptions{
ConcurrentRefreshes: 1,
},
UserAgent: "user-agent", UserAgent: "user-agent",
} }
cloudProvider := cloudBuilder.NewCloudProvider(autoscalingOptions) cloudProvider := cloudBuilder.NewCloudProvider(autoscalingOptions)

View File

@ -369,12 +369,12 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
defer config.Close() defer config.Close()
} }
manager, err := CreateGceManager(config, do, opts.Regional, opts.ConcurrentGceRefreshes, opts.UserAgent) manager, err := CreateGceManager(config, do, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
if err != nil { if err != nil {
klog.Fatalf("Failed to create GCE Manager: %v", err) klog.Fatalf("Failed to create GCE Manager: %v", err)
} }
pricingModel := NewGcePriceModel(NewGcePriceInfo(), opts.GceExpanderEphemeralStorageSupport) pricingModel := NewGcePriceModel(NewGcePriceInfo(), opts.GCEOptions.ExpanderEphemeralStorageSupport)
provider, err := BuildGceCloudProvider(manager, rl, pricingModel) provider, err := BuildGceCloudProvider(manager, rl, pricingModel)
if err != nil { if err != nil {
klog.Fatalf("Failed to create GCE cloud provider: %v", err) klog.Fatalf("Failed to create GCE cloud provider: %v", err)

View File

@ -123,7 +123,7 @@ type gceManagerImpl struct {
} }
// CreateGceManager constructs GceManager object. // CreateGceManager constructs GceManager object.
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, regional bool, concurrentGceRefreshes int, userAgent string) (GceManager, error) { func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, regional bool, concurrentGceRefreshes int, userAgent string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
// Create Google Compute Engine token. // Create Google Compute Engine token.
var err error var err error
tokenSource := google.ComputeTokenSource("") tokenSource := google.ComputeTokenSource("")
@ -183,7 +183,7 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr
cache: cache, cache: cache,
GceService: gceService, GceService: gceService,
migLister: migLister, migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes), migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
location: location, location: location,
regional: regional, regional: regional,
projectId: projectId, projectId: projectId,

View File

@ -351,7 +351,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
manager := &gceManagerImpl{ manager := &gceManagerImpl{
cache: cache, cache: cache,
migLister: migLister, migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1), migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second),
GceService: gceService, GceService: gceService,
projectId: projectId, projectId: projectId,
regional: regional, regional: regional,

View File

@ -23,6 +23,7 @@ import (
"path" "path"
"strings" "strings"
"sync" "sync"
"time"
gce "google.golang.org/api/compute/v1" gce "google.golang.org/api/compute/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -52,6 +53,10 @@ type MigInfoProvider interface {
GetMigMachineType(migRef GceRef) (MachineType, error) GetMigMachineType(migRef GceRef) (MachineType, error)
} }
type timeProvider interface {
Now() time.Time
}
type cachingMigInfoProvider struct { type cachingMigInfoProvider struct {
migInfoMutex sync.Mutex migInfoMutex sync.Mutex
cache *GceCache cache *GceCache
@ -60,16 +65,28 @@ type cachingMigInfoProvider struct {
projectId string projectId string
concurrentGceRefreshes int concurrentGceRefreshes int
migInstanceMutex sync.Mutex migInstanceMutex sync.Mutex
migInstancesMinRefreshWaitTime time.Duration
migInstancesLastRefreshedInfo map[string]time.Time
timeProvider timeProvider
}
type realTime struct{}
func (r *realTime) Now() time.Time {
return time.Now()
} }
// NewCachingMigInfoProvider creates an instance of caching MigInfoProvider // NewCachingMigInfoProvider creates an instance of caching MigInfoProvider
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int) MigInfoProvider { func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration) MigInfoProvider {
return &cachingMigInfoProvider{ return &cachingMigInfoProvider{
cache: cache, cache: cache,
migLister: migLister, migLister: migLister,
gceClient: gceClient, gceClient: gceClient,
projectId: projectId, projectId: projectId,
concurrentGceRefreshes: concurrentGceRefreshes, concurrentGceRefreshes: concurrentGceRefreshes,
migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime,
migInstancesLastRefreshedInfo: make(map[string]time.Time),
timeProvider: &realTime{},
} }
} }
@ -158,12 +175,21 @@ func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef)
} }
func (c *cachingMigInfoProvider) fillMigInstances(migRef GceRef) error { func (c *cachingMigInfoProvider) fillMigInstances(migRef GceRef) error {
if val, ok := c.migInstancesLastRefreshedInfo[migRef.String()]; ok {
// do not regenerate MIG instances cache if last refresh happened recently.
if c.timeProvider.Now().Sub(val) < c.migInstancesMinRefreshWaitTime {
klog.V(4).Infof("Not regenerating MIG instances cache for %s, as it was refreshed in last MinRefreshWaitTime (%s).", migRef.String(), c.migInstancesMinRefreshWaitTime)
return nil
}
}
klog.V(4).Infof("Regenerating MIG instances cache for %s", migRef.String()) klog.V(4).Infof("Regenerating MIG instances cache for %s", migRef.String())
instances, err := c.gceClient.FetchMigInstances(migRef) instances, err := c.gceClient.FetchMigInstances(migRef)
if err != nil { if err != nil {
c.migLister.HandleMigIssue(migRef, err) c.migLister.HandleMigIssue(migRef, err)
return err return err
} }
// only save information for successful calls, given the errors above may be transient.
c.migInstancesLastRefreshedInfo[migRef.String()] = c.timeProvider.Now()
return c.cache.SetMigInstances(migRef, instances) return c.cache.SetMigInstances(migRef, instances)
} }

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
gce "google.golang.org/api/compute/v1" gce "google.golang.org/api/compute/v1"
@ -235,7 +236,7 @@ func TestMigInfoProviderGetMigForInstance(t *testing.T) {
fetchMigs: fetchMigsConst(nil), fetchMigs: fetchMigsConst(nil),
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
mig, err := provider.GetMigForInstance(instanceRef) mig, err := provider.GetMigForInstance(instanceRef)
@ -307,7 +308,7 @@ func TestGetMigInstances(t *testing.T) {
fetchMigInstances: tc.fetchMigInstances, fetchMigInstances: tc.fetchMigInstances,
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
instances, err := provider.GetMigInstances(mig.GceRef()) instances, err := provider.GetMigInstances(mig.GceRef())
cachedInstances, cached := tc.cache.GetMigInstances(mig.GceRef()) cachedInstances, cached := tc.cache.GetMigInstances(mig.GceRef())
@ -471,7 +472,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
fetchMigInstances: tc.fetchMigInstances, fetchMigInstances: tc.fetchMigInstances,
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
err := provider.RegenerateMigInstancesCache() err := provider.RegenerateMigInstancesCache()
assert.Equal(t, tc.expectedErr, err) assert.Equal(t, tc.expectedErr, err)
@ -550,7 +551,7 @@ func TestGetMigTargetSize(t *testing.T) {
fetchMigTargetSize: tc.fetchMigTargetSize, fetchMigTargetSize: tc.fetchMigTargetSize,
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
targetSize, err := provider.GetMigTargetSize(mig.GceRef()) targetSize, err := provider.GetMigTargetSize(mig.GceRef())
cachedTargetSize, found := tc.cache.GetMigTargetSize(mig.GceRef()) cachedTargetSize, found := tc.cache.GetMigTargetSize(mig.GceRef())
@ -632,7 +633,7 @@ func TestGetMigBasename(t *testing.T) {
fetchMigBasename: tc.fetchMigBasename, fetchMigBasename: tc.fetchMigBasename,
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
basename, err := provider.GetMigBasename(mig.GceRef()) basename, err := provider.GetMigBasename(mig.GceRef())
cachedBasename, found := tc.cache.GetMigBasename(mig.GceRef()) cachedBasename, found := tc.cache.GetMigBasename(mig.GceRef())
@ -714,7 +715,7 @@ func TestGetMigInstanceTemplateName(t *testing.T) {
fetchMigTemplateName: tc.fetchMigTemplateName, fetchMigTemplateName: tc.fetchMigTemplateName,
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
templateName, err := provider.GetMigInstanceTemplateName(mig.GceRef()) templateName, err := provider.GetMigInstanceTemplateName(mig.GceRef())
cachedTemplateName, found := tc.cache.GetMigInstanceTemplateName(mig.GceRef()) cachedTemplateName, found := tc.cache.GetMigInstanceTemplateName(mig.GceRef())
@ -820,7 +821,7 @@ func TestGetMigInstanceTemplate(t *testing.T) {
fetchMigTemplate: tc.fetchMigTemplate, fetchMigTemplate: tc.fetchMigTemplate,
} }
migLister := NewMigLister(tc.cache) migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
template, err := provider.GetMigInstanceTemplate(mig.GceRef()) template, err := provider.GetMigInstanceTemplate(mig.GceRef())
cachedTemplate, found := tc.cache.GetMigInstanceTemplate(mig.GceRef()) cachedTemplate, found := tc.cache.GetMigInstanceTemplate(mig.GceRef())
@ -915,7 +916,7 @@ func TestGetMigMachineType(t *testing.T) {
fetchMachineType: tc.fetchMachineType, fetchMachineType: tc.fetchMachineType,
} }
migLister := NewMigLister(cache) migLister := NewMigLister(cache)
provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1) provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
machine, err := provider.GetMigMachineType(mig.GceRef()) machine, err := provider.GetMigMachineType(mig.GceRef())
if tc.expectError { if tc.expectError {
assert.Error(t, err) assert.Error(t, err)
@ -928,6 +929,91 @@ func TestGetMigMachineType(t *testing.T) {
} }
} }
func TestMultipleGetMigInstanceCallsLimited(t *testing.T) {
mig := &gceMig{
gceRef: GceRef{
Project: "project",
Zone: "zone",
Name: "base-instance-name",
},
}
instance := cloudprovider.Instance{
Id: "gce://project/zone/base-instance-name-abcd",
}
instanceRef, err := GceRefFromProviderId(instance.Id)
assert.Nil(t, err)
instance2 := cloudprovider.Instance{
Id: "gce://project/zone/base-instance-name-abcd2",
}
instanceRef2, err := GceRefFromProviderId(instance2.Id)
assert.Nil(t, err)
now := time.Now()
for name, tc := range map[string]struct {
refreshRateDuration time.Duration
firstCallTime time.Time
secondCallTime time.Time
expectedCallsToFetchMigInstances int
}{
"0s refresh rate duration, refetch expected": {
refreshRateDuration: 0 * time.Second,
firstCallTime: now,
secondCallTime: now,
expectedCallsToFetchMigInstances: 2,
},
"5s refresh rate duration, 0.01s between calls, no refetch expected": {
refreshRateDuration: 5 * time.Second,
firstCallTime: now,
secondCallTime: now.Add(10 * time.Millisecond),
expectedCallsToFetchMigInstances: 1,
},
"0.01s refresh rate duration, 0.01s between calls, refetch expected": {
refreshRateDuration: 10 * time.Millisecond,
firstCallTime: now,
secondCallTime: now.Add(11 * time.Millisecond),
expectedCallsToFetchMigInstances: 2,
},
} {
t.Run(name, func(t *testing.T) {
cache := emptyCache()
cache.migs = map[GceRef]Mig{
mig.gceRef: mig,
}
cache.migBaseNameCache = map[GceRef]string{mig.GceRef(): "base-instance-name"}
callCounter := make(map[GceRef]int)
client := &mockAutoscalingGceClient{
fetchMigInstances: fetchMigInstancesWithCounter(nil, callCounter),
}
migLister := NewMigLister(cache)
ft := &fakeTime{}
provider := &cachingMigInfoProvider{
cache: cache,
migLister: migLister,
gceClient: client,
projectId: projectId,
concurrentGceRefreshes: 1,
migInstancesMinRefreshWaitTime: tc.refreshRateDuration,
migInstancesLastRefreshedInfo: make(map[string]time.Time),
timeProvider: ft,
}
ft.now = tc.firstCallTime
_, err = provider.GetMigForInstance(instanceRef)
assert.NoError(t, err)
ft.now = tc.secondCallTime
_, err = provider.GetMigForInstance(instanceRef2)
assert.NoError(t, err)
assert.Equal(t, tc.expectedCallsToFetchMigInstances, callCounter[mig.GceRef()])
})
}
}
type fakeTime struct {
now time.Time
}
func (f *fakeTime) Now() time.Time {
return f.now
}
func emptyCache() *GceCache { func emptyCache() *GceCache {
return &GceCache{ return &GceCache{
migs: map[GceRef]Mig{mig.GceRef(): mig}, migs: map[GceRef]Mig{mig.GceRef(): mig},
@ -936,6 +1022,7 @@ func emptyCache() *GceCache {
migBaseNameCache: make(map[GceRef]string), migBaseNameCache: make(map[GceRef]string),
instanceTemplateNameCache: make(map[GceRef]string), instanceTemplateNameCache: make(map[GceRef]string),
instanceTemplatesCache: make(map[GceRef]*gce.InstanceTemplate), instanceTemplatesCache: make(map[GceRef]*gce.InstanceTemplate),
instancesFromUnknownMig: make(map[GceRef]bool),
} }
} }
@ -959,6 +1046,13 @@ func fetchMigInstancesConst(instances []cloudprovider.Instance) func(GceRef) ([]
} }
} }
func fetchMigInstancesWithCounter(instances []cloudprovider.Instance, migCounter map[GceRef]int) func(GceRef) ([]cloudprovider.Instance, error) {
return func(ref GceRef) ([]cloudprovider.Instance, error) {
migCounter[ref] = migCounter[ref] + 1
return instances, nil
}
}
func fetchMigInstancesMapping(instancesMapping map[GceRef][]cloudprovider.Instance) func(GceRef) ([]cloudprovider.Instance, error) { func fetchMigInstancesMapping(instancesMapping map[GceRef][]cloudprovider.Instance) func(GceRef) ([]cloudprovider.Instance, error) {
return func(migRef GceRef) ([]cloudprovider.Instance, error) { return func(migRef GceRef) ([]cloudprovider.Instance, error) {
return instancesMapping[migRef], nil return instancesMapping[migRef], nil

View File

@ -46,6 +46,16 @@ type NodeGroupAutoscalingOptions struct {
ScaleDownUnreadyTime time.Duration ScaleDownUnreadyTime time.Duration
} }
// GCEOptions contain autoscaling options specific to GCE cloud provider.
type GCEOptions struct {
// ConcurrentRefreshes is the maximum number of concurrently refreshed instance groups or instance templates.
ConcurrentRefreshes int
// MigInstancesMinRefreshWaitTime is the minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.
MigInstancesMinRefreshWaitTime time.Duration
// ExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account.
ExpanderEphemeralStorageSupport bool
}
const ( const (
// DefaultMaxAllocatableDifferenceRatio describes how Node.Status.Allocatable can differ between groups in the same NodeGroupSet // DefaultMaxAllocatableDifferenceRatio describes how Node.Status.Allocatable can differ between groups in the same NodeGroupSet
DefaultMaxAllocatableDifferenceRatio = 0.05 DefaultMaxAllocatableDifferenceRatio = 0.05
@ -194,8 +204,8 @@ type AutoscalingOptions struct {
BalancingLabels []string BalancingLabels []string
// AWSUseStaticInstanceList tells if AWS cloud provider use static instance type list or dynamically fetch from remote APIs. // AWSUseStaticInstanceList tells if AWS cloud provider use static instance type list or dynamically fetch from remote APIs.
AWSUseStaticInstanceList bool AWSUseStaticInstanceList bool
// ConcurrentGceRefreshes is the maximum number of concurrently refreshed instance groups or instance templates. // GCEOptions contain autoscaling options specific to GCE cloud provider.
ConcurrentGceRefreshes int GCEOptions GCEOptions
// Path to kube configuration if available // Path to kube configuration if available
KubeConfigPath string KubeConfigPath string
// Burst setting for kubernetes client // Burst setting for kubernetes client
@ -223,8 +233,6 @@ type AutoscalingOptions struct {
MaxScaleDownParallelism int MaxScaleDownParallelism int
// MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel. // MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel.
MaxDrainParallelism int MaxDrainParallelism int
// GceExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account.
GceExpanderEphemeralStorageSupport bool
// RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window. // RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window.
RecordDuplicatedEvents bool RecordDuplicatedEvents bool
// MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up. // MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up.

View File

@ -191,7 +191,12 @@ var (
balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar") balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar")
balancingLabelsFlag = multiStringFlag("balancing-label", "Specifies a label to use for comparing if two node groups are similar, rather than the built in heuristics. Setting this flag disables all other comparison logic, and cannot be combined with --balancing-ignore-label.") balancingLabelsFlag = multiStringFlag("balancing-label", "Specifies a label to use for comparing if two node groups are similar, rather than the built in heuristics. Setting this flag disables all other comparison logic, and cannot be combined with --balancing-ignore-label.")
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
// GCE specific flags
concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.") concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.")
gceMigInstancesMinRefreshWaitTime = flag.Duration("gce-mig-instances-min-refresh-wait-time", 5*time.Second, "The minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.")
gceExpanderEphemeralStorageSupport = flag.Bool("gce-expander-ephemeral-storage-support", false, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider")
enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled")
clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only") clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only")
cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process") cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process")
@ -210,7 +215,6 @@ var (
"nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.") "nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.")
maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.")
maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.")
gceExpanderEphemeralStorageSupport = flag.Bool("gce-expander-ephemeral-storage-support", false, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider")
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
@ -307,7 +311,11 @@ func createAutoscalingOptions() config.AutoscalingOptions {
KubeClientQPS: *kubeClientQPS, KubeClientQPS: *kubeClientQPS,
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout, NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
AWSUseStaticInstanceList: *awsUseStaticInstanceList, AWSUseStaticInstanceList: *awsUseStaticInstanceList,
ConcurrentGceRefreshes: *concurrentGceRefreshes, GCEOptions: config.GCEOptions{
ConcurrentRefreshes: *concurrentGceRefreshes,
MigInstancesMinRefreshWaitTime: *gceMigInstancesMinRefreshWaitTime,
ExpanderEphemeralStorageSupport: *gceExpanderEphemeralStorageSupport,
},
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative, ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate, CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,
DaemonSetEvictionForEmptyNodes: *daemonSetEvictionForEmptyNodes, DaemonSetEvictionForEmptyNodes: *daemonSetEvictionForEmptyNodes,
@ -318,7 +326,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeGroupBackoffResetTimeout: *nodeGroupBackoffResetTimeout, NodeGroupBackoffResetTimeout: *nodeGroupBackoffResetTimeout,
MaxScaleDownParallelism: *maxScaleDownParallelismFlag, MaxScaleDownParallelism: *maxScaleDownParallelismFlag,
MaxDrainParallelism: *maxDrainParallelismFlag, MaxDrainParallelism: *maxDrainParallelismFlag,
GceExpanderEphemeralStorageSupport: *gceExpanderEphemeralStorageSupport,
RecordDuplicatedEvents: *recordDuplicatedEvents, RecordDuplicatedEvents: *recordDuplicatedEvents,
MaxNodesPerScaleUp: *maxNodesPerScaleUp, MaxNodesPerScaleUp: *maxNodesPerScaleUp,
MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration,