autoscaler/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go

562 lines
17 KiB
Go

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"fmt"
"reflect"
"strings"
"sync"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
klog "k8s.io/klog/v2"
)
const (
scaleToZeroSupported = true
placeholderInstanceNamePrefix = "i-placeholder"
placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled"
)
type asgCache struct {
registeredAsgs map[AwsRef]*asg
asgToInstances map[AwsRef][]AwsInstanceRef
instanceToAsg map[AwsInstanceRef]*asg
instanceStatus map[AwsInstanceRef]*string
asgInstanceTypeCache *instanceTypeExpirationStore
mutex sync.Mutex
awsService *awsWrapper
interrupt chan struct{}
asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
explicitlyConfigured map[AwsRef]bool
autoscalingOptions map[AwsRef]map[string]string
}
type launchTemplate struct {
name string
version string
}
type mixedInstancesPolicy struct {
launchTemplate *launchTemplate
instanceTypesOverrides []string
instanceRequirementsOverrides *autoscaling.InstanceRequirements
}
type asg struct {
AwsRef
minSize int
maxSize int
curSize int
lastUpdateTime time.Time
AvailabilityZones []string
LaunchConfigurationName string
LaunchTemplate *launchTemplate
MixedInstancesPolicy *mixedInstancesPolicy
Tags []*autoscaling.TagDescription
}
func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) {
registry := &asgCache{
registeredAsgs: make(map[AwsRef]*asg, 0),
awsService: awsService,
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
instanceToAsg: make(map[AwsInstanceRef]*asg),
instanceStatus: make(map[AwsInstanceRef]*string),
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
explicitlyConfigured: make(map[AwsRef]bool),
autoscalingOptions: make(map[AwsRef]map[string]string),
}
if err := registry.parseExplicitAsgs(explicitSpecs); err != nil {
return nil, err
}
return registry, nil
}
// Use a function variable for ease of testing
var getInstanceTypeForAsg = func(m *asgCache, group *asg) (string, error) {
if obj, found, _ := m.asgInstanceTypeCache.GetByKey(group.AwsRef.Name); found {
return obj.(instanceTypeCachedObject).instanceType, nil
} else if result, err := m.awsService.getInstanceTypesForAsgs([]*asg{group}); err == nil {
return result[group.AwsRef.Name], nil
}
return "", fmt.Errorf("could not find instance type for %s", group.AwsRef.Name)
}
// Fetch explicitly configured ASGs. These ASGs should never be unregistered
// during refreshes, even if they no longer exist in AWS.
func (m *asgCache) parseExplicitAsgs(specs []string) error {
for _, spec := range specs {
asg, err := m.buildAsgFromSpec(spec)
if err != nil {
return fmt.Errorf("failed to parse node group spec: %v", err)
}
m.explicitlyConfigured[asg.AwsRef] = true
m.register(asg)
}
return nil
}
// Register ASG. Returns the registered ASG.
func (m *asgCache) register(asg *asg) *asg {
if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists {
if reflect.DeepEqual(existing, asg) {
return existing
}
klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name)
// Explicit registered groups should always use the manually provided min/max
// values and the not the ones returned by the API
if !m.explicitlyConfigured[asg.AwsRef] {
existing.minSize = asg.minSize
existing.maxSize = asg.maxSize
}
existing.curSize = asg.curSize
// Those information are mainly required to create templates when scaling
// from zero
existing.AvailabilityZones = asg.AvailabilityZones
existing.LaunchConfigurationName = asg.LaunchConfigurationName
existing.LaunchTemplate = asg.LaunchTemplate
existing.MixedInstancesPolicy = asg.MixedInstancesPolicy
existing.Tags = asg.Tags
return existing
}
klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name)
m.registeredAsgs[asg.AwsRef] = asg
return asg
}
// Unregister ASG. Returns the unregistered ASG.
func (m *asgCache) unregister(a *asg) *asg {
if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists {
klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name)
delete(m.registeredAsgs, a.AwsRef)
}
return a
}
func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) {
s, err := dynamic.SpecFromString(spec, scaleToZeroSupported)
if err != nil {
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
}
asg := &asg{
AwsRef: AwsRef{Name: s.Name},
minSize: s.MinSize,
maxSize: s.MaxSize,
}
return asg, nil
}
// Get returns the currently registered ASGs
func (m *asgCache) Get() map[AwsRef]*asg {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.registeredAsgs
}
// GetAutoscalingOptions return autoscaling options strings obtained from ASG tags.
func (m *asgCache) GetAutoscalingOptions(ref AwsRef) map[string]string {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.autoscalingOptions[ref]
}
// FindForInstance returns AsgConfig of the given Instance
func (m *asgCache) FindForInstance(instance AwsInstanceRef) *asg {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.findForInstance(instance)
}
func (m *asgCache) findForInstance(instance AwsInstanceRef) *asg {
if asg, found := m.instanceToAsg[instance]; found {
return asg
}
return nil
}
// InstancesByAsg returns the nodes of an ASG
func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if instances, found := m.asgToInstances[ref]; found {
return instances, nil
}
return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref)
}
func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if status, found := m.instanceStatus[ref]; found {
return status, nil
}
return nil, fmt.Errorf("could not find instance %v", ref)
}
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.setAsgSizeNoLock(asg, size)
}
func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
HonorCooldown: aws.Bool(false),
}
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
start := time.Now()
_, err := m.awsService.SetDesiredCapacity(params)
observeAWSRequest("SetDesiredCapacity", err, start)
if err != nil {
return err
}
// Proactively set the ASG size so autoscaler makes better decisions
asg.lastUpdateTime = start
asg.curSize = size
return nil
}
func (m *asgCache) decreaseAsgSizeByOneNoLock(asg *asg) error {
return m.setAsgSizeNoLock(asg, asg.curSize-1)
}
// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(instances) == 0 {
return nil
}
commonAsg := m.findForInstance(*instances[0])
if commonAsg == nil {
return fmt.Errorf("can't delete instance %s, which is not part of an ASG", instances[0].Name)
}
for _, instance := range instances {
asg := m.findForInstance(*instance)
if asg != commonAsg {
instanceIds := make([]string, len(instances))
for i, instance := range instances {
instanceIds[i] = instance.Name
}
return fmt.Errorf("can't delete instances %s as they belong to at least two different ASGs (%s and %s)", strings.Join(instanceIds, ","), commonAsg.Name, asg.Name)
}
}
for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
}
}
return nil
}
// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
}
// Fetch automatically discovered ASGs. These ASGs should be unregistered if
// they no longer exist in AWS.
func (m *asgCache) buildAsgTags() map[string]string {
groupTags := map[string]string{}
for _, spec := range m.asgAutoDiscoverySpecs {
for k, v := range spec.Tags {
groupTags[k] = v
}
}
return groupTags
}
func (m *asgCache) buildAsgNames() []string {
refreshNames := make([]string, len(m.explicitlyConfigured))
i := 0
for k := range m.explicitlyConfigured {
refreshNames[i] = k.Name
i++
}
return refreshNames
}
// regenerate the cached view of explicitly configured and auto-discovered ASGs
func (m *asgCache) regenerate() error {
m.mutex.Lock()
defer m.mutex.Unlock()
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
// Fetch details of all ASGs
refreshNames := m.buildAsgNames()
klog.V(4).Infof("Regenerating instance to ASG map for ASG names: %v", refreshNames)
namedGroups, err := m.awsService.getAutoscalingGroupsByNames(refreshNames)
if err != nil {
return err
}
refreshTags := m.buildAsgTags()
klog.V(4).Infof("Regenerating instance to ASG map for ASG tags: %v", refreshTags)
taggedGroups, err := m.awsService.getAutoscalingGroupsByTags(refreshTags)
if err != nil {
return err
}
groups := append(namedGroups, taggedGroups...)
// If currently any ASG has more Desired than running Instances, introduce placeholders
// for the instances to come up. This is required to track Desired instances that
// will never come up, like with Spot Request that can't be fulfilled
groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)
// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
asg, err := m.buildAsgFromAWS(group)
if err != nil {
return err
}
exists[asg.AwsRef] = true
asg = m.register(asg)
newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
for i, instance := range group.Instances {
ref := m.buildInstanceRefFromAWS(instance)
newInstanceToAsgCache[ref] = asg
newAsgToInstancesCache[asg.AwsRef][i] = ref
newInstanceStatusMap[ref] = instance.HealthStatus
}
}
// Unregister no longer existing auto-discovered ASGs
for _, asg := range m.registeredAsgs {
if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
m.unregister(asg)
}
}
err = m.asgInstanceTypeCache.populate(m.registeredAsgs)
if err != nil {
klog.Warningf("Failed to fully populate ASG->instanceType mapping: %v", err)
}
// Rebuild autoscaling options cache
newAutoscalingOptions := make(map[AwsRef]map[string]string)
for _, asg := range m.registeredAsgs {
options := extractAutoscalingOptionsFromTags(asg.Tags)
if !reflect.DeepEqual(m.autoscalingOptions[asg.AwsRef], options) {
klog.V(4).Infof("Extracted autoscaling options from %q ASG tags: %v", asg.Name, options)
}
newAutoscalingOptions[asg.AwsRef] = options
}
m.asgToInstances = newAsgToInstancesCache
m.instanceToAsg = newInstanceToAsgCache
m.autoscalingOptions = newAutoscalingOptions
m.instanceStatus = newInstanceStatusMap
return nil
}
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
for _, g := range groups {
desired := *g.DesiredCapacity
realInstances := int64(len(g.Instances))
if desired <= realInstances {
continue
}
klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
"Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired)
healthStatus := ""
isAvailable, err := m.isNodeGroupAvailable(g)
if err != nil {
klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err)
} else if !isAvailable {
klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName)
healthStatus = placeholderUnfulfillableStatus
}
for i := realInstances; i < desired; i++ {
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
g.Instances = append(g.Instances, &autoscaling.Instance{
InstanceId: &id,
AvailabilityZone: g.AvailabilityZones[0],
HealthStatus: &healthStatus,
})
}
}
return groups
}
func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: group.AutoScalingGroupName,
}
start := time.Now()
response, err := m.awsService.DescribeScalingActivities(input)
observeAWSRequest("DescribeScalingActivities", err, start)
if err != nil {
return true, err // If we can't describe the scaling activities we assume the node group is available
}
for _, activity := range response.Activities {
asgRef := AwsRef{Name: *group.AutoScalingGroupName}
if a, ok := m.registeredAsgs[asgRef]; ok {
lut := a.lastUpdateTime
if activity.StartTime.Before(lut) {
break
} else if *activity.StatusCode == "Failed" {
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
return false, nil
}
} else {
klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name)
}
}
return true, nil
}
func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
spec := dynamic.NodeGroupSpec{
Name: aws.StringValue(g.AutoScalingGroupName),
MinSize: int(aws.Int64Value(g.MinSize)),
MaxSize: int(aws.Int64Value(g.MaxSize)),
SupportScaleToZero: scaleToZeroSupported,
}
if verr := spec.Validate(); verr != nil {
return nil, fmt.Errorf("failed to create node group spec: %v", verr)
}
asg := &asg{
AwsRef: AwsRef{Name: spec.Name},
minSize: spec.MinSize,
maxSize: spec.MaxSize,
curSize: int(aws.Int64Value(g.DesiredCapacity)),
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
Tags: g.Tags,
}
if g.LaunchTemplate != nil {
asg.LaunchTemplate = buildLaunchTemplateFromSpec(g.LaunchTemplate)
}
if g.MixedInstancesPolicy != nil {
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
res := []string{}
for _, override := range overrides {
if override.InstanceType != nil {
res = append(res, *override.InstanceType)
}
}
return res
}
getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
return overrides[0].InstanceRequirements
}
return nil
}
asg.MixedInstancesPolicy = &mixedInstancesPolicy{
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
}
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
return nil, fmt.Errorf("invalid setup of both instance type and instance requirements overrides configured")
}
}
return asg, nil
}
func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef {
providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId))
return AwsInstanceRef{
ProviderID: providerID,
Name: aws.StringValue(instance.InstanceId),
}
}
// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}