562 lines
17 KiB
Go
562 lines
17 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package aws
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
|
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
|
|
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
|
klog "k8s.io/klog/v2"
|
|
)
|
|
|
|
const (
|
|
scaleToZeroSupported = true
|
|
placeholderInstanceNamePrefix = "i-placeholder"
|
|
placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled"
|
|
)
|
|
|
|
type asgCache struct {
|
|
registeredAsgs map[AwsRef]*asg
|
|
asgToInstances map[AwsRef][]AwsInstanceRef
|
|
instanceToAsg map[AwsInstanceRef]*asg
|
|
instanceStatus map[AwsInstanceRef]*string
|
|
asgInstanceTypeCache *instanceTypeExpirationStore
|
|
mutex sync.Mutex
|
|
awsService *awsWrapper
|
|
interrupt chan struct{}
|
|
|
|
asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
|
|
explicitlyConfigured map[AwsRef]bool
|
|
autoscalingOptions map[AwsRef]map[string]string
|
|
}
|
|
|
|
type launchTemplate struct {
|
|
name string
|
|
version string
|
|
}
|
|
|
|
type mixedInstancesPolicy struct {
|
|
launchTemplate *launchTemplate
|
|
instanceTypesOverrides []string
|
|
instanceRequirementsOverrides *autoscaling.InstanceRequirements
|
|
}
|
|
|
|
type asg struct {
|
|
AwsRef
|
|
|
|
minSize int
|
|
maxSize int
|
|
curSize int
|
|
lastUpdateTime time.Time
|
|
|
|
AvailabilityZones []string
|
|
LaunchConfigurationName string
|
|
LaunchTemplate *launchTemplate
|
|
MixedInstancesPolicy *mixedInstancesPolicy
|
|
Tags []*autoscaling.TagDescription
|
|
}
|
|
|
|
func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) {
|
|
registry := &asgCache{
|
|
registeredAsgs: make(map[AwsRef]*asg, 0),
|
|
awsService: awsService,
|
|
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
|
|
instanceToAsg: make(map[AwsInstanceRef]*asg),
|
|
instanceStatus: make(map[AwsInstanceRef]*string),
|
|
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
|
|
interrupt: make(chan struct{}),
|
|
asgAutoDiscoverySpecs: autoDiscoverySpecs,
|
|
explicitlyConfigured: make(map[AwsRef]bool),
|
|
autoscalingOptions: make(map[AwsRef]map[string]string),
|
|
}
|
|
|
|
if err := registry.parseExplicitAsgs(explicitSpecs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return registry, nil
|
|
}
|
|
|
|
// Use a function variable for ease of testing
|
|
var getInstanceTypeForAsg = func(m *asgCache, group *asg) (string, error) {
|
|
if obj, found, _ := m.asgInstanceTypeCache.GetByKey(group.AwsRef.Name); found {
|
|
return obj.(instanceTypeCachedObject).instanceType, nil
|
|
} else if result, err := m.awsService.getInstanceTypesForAsgs([]*asg{group}); err == nil {
|
|
return result[group.AwsRef.Name], nil
|
|
}
|
|
|
|
return "", fmt.Errorf("could not find instance type for %s", group.AwsRef.Name)
|
|
}
|
|
|
|
// Fetch explicitly configured ASGs. These ASGs should never be unregistered
|
|
// during refreshes, even if they no longer exist in AWS.
|
|
func (m *asgCache) parseExplicitAsgs(specs []string) error {
|
|
for _, spec := range specs {
|
|
asg, err := m.buildAsgFromSpec(spec)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse node group spec: %v", err)
|
|
}
|
|
m.explicitlyConfigured[asg.AwsRef] = true
|
|
m.register(asg)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Register ASG. Returns the registered ASG.
|
|
func (m *asgCache) register(asg *asg) *asg {
|
|
if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists {
|
|
if reflect.DeepEqual(existing, asg) {
|
|
return existing
|
|
}
|
|
|
|
klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name)
|
|
|
|
// Explicit registered groups should always use the manually provided min/max
|
|
// values and the not the ones returned by the API
|
|
if !m.explicitlyConfigured[asg.AwsRef] {
|
|
existing.minSize = asg.minSize
|
|
existing.maxSize = asg.maxSize
|
|
}
|
|
|
|
existing.curSize = asg.curSize
|
|
|
|
// Those information are mainly required to create templates when scaling
|
|
// from zero
|
|
existing.AvailabilityZones = asg.AvailabilityZones
|
|
existing.LaunchConfigurationName = asg.LaunchConfigurationName
|
|
existing.LaunchTemplate = asg.LaunchTemplate
|
|
existing.MixedInstancesPolicy = asg.MixedInstancesPolicy
|
|
existing.Tags = asg.Tags
|
|
|
|
return existing
|
|
}
|
|
klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name)
|
|
m.registeredAsgs[asg.AwsRef] = asg
|
|
return asg
|
|
}
|
|
|
|
// Unregister ASG. Returns the unregistered ASG.
|
|
func (m *asgCache) unregister(a *asg) *asg {
|
|
if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists {
|
|
klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name)
|
|
delete(m.registeredAsgs, a.AwsRef)
|
|
}
|
|
return a
|
|
}
|
|
|
|
func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) {
|
|
s, err := dynamic.SpecFromString(spec, scaleToZeroSupported)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
|
|
}
|
|
asg := &asg{
|
|
AwsRef: AwsRef{Name: s.Name},
|
|
minSize: s.MinSize,
|
|
maxSize: s.MaxSize,
|
|
}
|
|
return asg, nil
|
|
}
|
|
|
|
// Get returns the currently registered ASGs
|
|
func (m *asgCache) Get() map[AwsRef]*asg {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.registeredAsgs
|
|
}
|
|
|
|
// GetAutoscalingOptions return autoscaling options strings obtained from ASG tags.
|
|
func (m *asgCache) GetAutoscalingOptions(ref AwsRef) map[string]string {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
return m.autoscalingOptions[ref]
|
|
}
|
|
|
|
// FindForInstance returns AsgConfig of the given Instance
|
|
func (m *asgCache) FindForInstance(instance AwsInstanceRef) *asg {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.findForInstance(instance)
|
|
}
|
|
|
|
func (m *asgCache) findForInstance(instance AwsInstanceRef) *asg {
|
|
if asg, found := m.instanceToAsg[instance]; found {
|
|
return asg
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InstancesByAsg returns the nodes of an ASG
|
|
func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if instances, found := m.asgToInstances[ref]; found {
|
|
return instances, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref)
|
|
}
|
|
|
|
func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if status, found := m.instanceStatus[ref]; found {
|
|
return status, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("could not find instance %v", ref)
|
|
}
|
|
|
|
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.setAsgSizeNoLock(asg, size)
|
|
}
|
|
|
|
func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
|
|
params := &autoscaling.SetDesiredCapacityInput{
|
|
AutoScalingGroupName: aws.String(asg.Name),
|
|
DesiredCapacity: aws.Int64(int64(size)),
|
|
HonorCooldown: aws.Bool(false),
|
|
}
|
|
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
|
|
start := time.Now()
|
|
_, err := m.awsService.SetDesiredCapacity(params)
|
|
observeAWSRequest("SetDesiredCapacity", err, start)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Proactively set the ASG size so autoscaler makes better decisions
|
|
asg.lastUpdateTime = start
|
|
asg.curSize = size
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *asgCache) decreaseAsgSizeByOneNoLock(asg *asg) error {
|
|
return m.setAsgSizeNoLock(asg, asg.curSize-1)
|
|
}
|
|
|
|
// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
|
|
func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if len(instances) == 0 {
|
|
return nil
|
|
}
|
|
commonAsg := m.findForInstance(*instances[0])
|
|
if commonAsg == nil {
|
|
return fmt.Errorf("can't delete instance %s, which is not part of an ASG", instances[0].Name)
|
|
}
|
|
|
|
for _, instance := range instances {
|
|
asg := m.findForInstance(*instance)
|
|
|
|
if asg != commonAsg {
|
|
instanceIds := make([]string, len(instances))
|
|
for i, instance := range instances {
|
|
instanceIds[i] = instance.Name
|
|
}
|
|
|
|
return fmt.Errorf("can't delete instances %s as they belong to at least two different ASGs (%s and %s)", strings.Join(instanceIds, ","), commonAsg.Name, asg.Name)
|
|
}
|
|
}
|
|
|
|
for _, instance := range instances {
|
|
// check if the instance is a placeholder - a requested instance that was never created by the node group
|
|
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
|
|
if m.isPlaceholderInstance(instance) {
|
|
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
|
|
"of deleting instance", instance.Name)
|
|
m.decreaseAsgSizeByOneNoLock(commonAsg)
|
|
} else {
|
|
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
|
|
InstanceId: aws.String(instance.Name),
|
|
ShouldDecrementDesiredCapacity: aws.Bool(true),
|
|
}
|
|
start := time.Now()
|
|
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
|
|
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
klog.V(4).Infof(*resp.Activity.Description)
|
|
|
|
// Proactively decrement the size so autoscaler makes better decisions
|
|
commonAsg.curSize--
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isPlaceholderInstance checks if the given instance is only a placeholder
|
|
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
|
|
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
|
|
}
|
|
|
|
// Fetch automatically discovered ASGs. These ASGs should be unregistered if
|
|
// they no longer exist in AWS.
|
|
func (m *asgCache) buildAsgTags() map[string]string {
|
|
groupTags := map[string]string{}
|
|
for _, spec := range m.asgAutoDiscoverySpecs {
|
|
for k, v := range spec.Tags {
|
|
groupTags[k] = v
|
|
}
|
|
}
|
|
|
|
return groupTags
|
|
}
|
|
|
|
func (m *asgCache) buildAsgNames() []string {
|
|
refreshNames := make([]string, len(m.explicitlyConfigured))
|
|
i := 0
|
|
for k := range m.explicitlyConfigured {
|
|
refreshNames[i] = k.Name
|
|
i++
|
|
}
|
|
|
|
return refreshNames
|
|
}
|
|
|
|
// regenerate the cached view of explicitly configured and auto-discovered ASGs
|
|
func (m *asgCache) regenerate() error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
|
|
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
|
|
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
|
|
|
|
// Fetch details of all ASGs
|
|
refreshNames := m.buildAsgNames()
|
|
klog.V(4).Infof("Regenerating instance to ASG map for ASG names: %v", refreshNames)
|
|
namedGroups, err := m.awsService.getAutoscalingGroupsByNames(refreshNames)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
refreshTags := m.buildAsgTags()
|
|
klog.V(4).Infof("Regenerating instance to ASG map for ASG tags: %v", refreshTags)
|
|
taggedGroups, err := m.awsService.getAutoscalingGroupsByTags(refreshTags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
groups := append(namedGroups, taggedGroups...)
|
|
|
|
// If currently any ASG has more Desired than running Instances, introduce placeholders
|
|
// for the instances to come up. This is required to track Desired instances that
|
|
// will never come up, like with Spot Request that can't be fulfilled
|
|
groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)
|
|
|
|
// Register or update ASGs
|
|
exists := make(map[AwsRef]bool)
|
|
for _, group := range groups {
|
|
asg, err := m.buildAsgFromAWS(group)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
exists[asg.AwsRef] = true
|
|
|
|
asg = m.register(asg)
|
|
|
|
newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
|
|
|
|
for i, instance := range group.Instances {
|
|
ref := m.buildInstanceRefFromAWS(instance)
|
|
newInstanceToAsgCache[ref] = asg
|
|
newAsgToInstancesCache[asg.AwsRef][i] = ref
|
|
newInstanceStatusMap[ref] = instance.HealthStatus
|
|
}
|
|
}
|
|
|
|
// Unregister no longer existing auto-discovered ASGs
|
|
for _, asg := range m.registeredAsgs {
|
|
if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
|
|
m.unregister(asg)
|
|
}
|
|
}
|
|
|
|
err = m.asgInstanceTypeCache.populate(m.registeredAsgs)
|
|
if err != nil {
|
|
klog.Warningf("Failed to fully populate ASG->instanceType mapping: %v", err)
|
|
}
|
|
|
|
// Rebuild autoscaling options cache
|
|
newAutoscalingOptions := make(map[AwsRef]map[string]string)
|
|
for _, asg := range m.registeredAsgs {
|
|
options := extractAutoscalingOptionsFromTags(asg.Tags)
|
|
if !reflect.DeepEqual(m.autoscalingOptions[asg.AwsRef], options) {
|
|
klog.V(4).Infof("Extracted autoscaling options from %q ASG tags: %v", asg.Name, options)
|
|
}
|
|
newAutoscalingOptions[asg.AwsRef] = options
|
|
}
|
|
|
|
m.asgToInstances = newAsgToInstancesCache
|
|
m.instanceToAsg = newInstanceToAsgCache
|
|
m.autoscalingOptions = newAutoscalingOptions
|
|
m.instanceStatus = newInstanceStatusMap
|
|
return nil
|
|
}
|
|
|
|
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
|
|
for _, g := range groups {
|
|
desired := *g.DesiredCapacity
|
|
realInstances := int64(len(g.Instances))
|
|
if desired <= realInstances {
|
|
continue
|
|
}
|
|
|
|
klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
|
|
"Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired)
|
|
|
|
healthStatus := ""
|
|
isAvailable, err := m.isNodeGroupAvailable(g)
|
|
if err != nil {
|
|
klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err)
|
|
} else if !isAvailable {
|
|
klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName)
|
|
healthStatus = placeholderUnfulfillableStatus
|
|
}
|
|
|
|
for i := realInstances; i < desired; i++ {
|
|
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
|
|
g.Instances = append(g.Instances, &autoscaling.Instance{
|
|
InstanceId: &id,
|
|
AvailabilityZone: g.AvailabilityZones[0],
|
|
HealthStatus: &healthStatus,
|
|
})
|
|
}
|
|
}
|
|
return groups
|
|
}
|
|
|
|
func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
|
|
input := &autoscaling.DescribeScalingActivitiesInput{
|
|
AutoScalingGroupName: group.AutoScalingGroupName,
|
|
}
|
|
|
|
start := time.Now()
|
|
response, err := m.awsService.DescribeScalingActivities(input)
|
|
observeAWSRequest("DescribeScalingActivities", err, start)
|
|
if err != nil {
|
|
return true, err // If we can't describe the scaling activities we assume the node group is available
|
|
}
|
|
|
|
for _, activity := range response.Activities {
|
|
asgRef := AwsRef{Name: *group.AutoScalingGroupName}
|
|
if a, ok := m.registeredAsgs[asgRef]; ok {
|
|
lut := a.lastUpdateTime
|
|
if activity.StartTime.Before(lut) {
|
|
break
|
|
} else if *activity.StatusCode == "Failed" {
|
|
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
|
|
return false, nil
|
|
}
|
|
} else {
|
|
klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name)
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
|
|
spec := dynamic.NodeGroupSpec{
|
|
Name: aws.StringValue(g.AutoScalingGroupName),
|
|
MinSize: int(aws.Int64Value(g.MinSize)),
|
|
MaxSize: int(aws.Int64Value(g.MaxSize)),
|
|
SupportScaleToZero: scaleToZeroSupported,
|
|
}
|
|
|
|
if verr := spec.Validate(); verr != nil {
|
|
return nil, fmt.Errorf("failed to create node group spec: %v", verr)
|
|
}
|
|
|
|
asg := &asg{
|
|
AwsRef: AwsRef{Name: spec.Name},
|
|
minSize: spec.MinSize,
|
|
maxSize: spec.MaxSize,
|
|
|
|
curSize: int(aws.Int64Value(g.DesiredCapacity)),
|
|
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
|
|
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
|
|
Tags: g.Tags,
|
|
}
|
|
|
|
if g.LaunchTemplate != nil {
|
|
asg.LaunchTemplate = buildLaunchTemplateFromSpec(g.LaunchTemplate)
|
|
}
|
|
|
|
if g.MixedInstancesPolicy != nil {
|
|
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
|
|
res := []string{}
|
|
for _, override := range overrides {
|
|
if override.InstanceType != nil {
|
|
res = append(res, *override.InstanceType)
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
|
|
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
|
|
return overrides[0].InstanceRequirements
|
|
}
|
|
return nil
|
|
}
|
|
|
|
asg.MixedInstancesPolicy = &mixedInstancesPolicy{
|
|
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
|
|
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
|
|
instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
|
|
}
|
|
|
|
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
|
|
return nil, fmt.Errorf("invalid setup of both instance type and instance requirements overrides configured")
|
|
}
|
|
}
|
|
|
|
return asg, nil
|
|
}
|
|
|
|
func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef {
|
|
providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId))
|
|
return AwsInstanceRef{
|
|
ProviderID: providerID,
|
|
Name: aws.StringValue(instance.InstanceId),
|
|
}
|
|
}
|
|
|
|
// Cleanup closes the channel to signal the go routine to stop that is handling the cache
|
|
func (m *asgCache) Cleanup() {
|
|
close(m.interrupt)
|
|
}
|