Merge pull request #5411 from alam0rt/fix-issue-where-terminating-instances-scale-down-asg-to-min

Track lifecycle and skip terminating already terminating instances
This commit is contained in:
Kubernetes Prow Robot 2023-05-18 06:22:34 -07:00 committed by GitHub
commit 47af9cc434
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 1 deletions

View File

@ -40,6 +40,7 @@ type asgCache struct {
asgToInstances map[AwsRef][]AwsInstanceRef
instanceToAsg map[AwsInstanceRef]*asg
instanceStatus map[AwsInstanceRef]*string
instanceLifecycle map[AwsInstanceRef]*string
asgInstanceTypeCache *instanceTypeExpirationStore
mutex sync.Mutex
awsService *awsWrapper
@ -83,6 +84,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
instanceToAsg: make(map[AwsInstanceRef]*asg),
instanceStatus: make(map[AwsInstanceRef]*string),
instanceLifecycle: make(map[AwsInstanceRef]*string),
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
@ -239,6 +241,14 @@ func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
return nil, fmt.Errorf("could not find instance %v", ref)
}
func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) {
if lifecycle, found := m.instanceLifecycle[ref]; found {
return lifecycle, nil
}
return nil, fmt.Errorf("could not find instance %v", ref)
}
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -292,7 +302,6 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
for i, instance := range instances {
instanceIds[i] = instance.Name
}
return fmt.Errorf("can't delete instances %s as they belong to at least two different ASGs (%s and %s)", strings.Join(instanceIds, ","), commonAsg.Name, asg.Name)
}
}
@ -305,6 +314,22 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}
if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating, will skip instead", instance.Name)
continue
}
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
@ -361,6 +386,7 @@ func (m *asgCache) regenerate() error {
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
newInstanceLifecycleMap := make(map[AwsInstanceRef]*string)
// Fetch details of all ASGs
refreshNames := m.buildAsgNames()
@ -402,6 +428,7 @@ func (m *asgCache) regenerate() error {
newInstanceToAsgCache[ref] = asg
newAsgToInstancesCache[asg.AwsRef][i] = ref
newInstanceStatusMap[ref] = instance.HealthStatus
newInstanceLifecycleMap[ref] = instance.LifecycleState
}
}
@ -431,6 +458,7 @@ func (m *asgCache) regenerate() error {
m.instanceToAsg = newInstanceToAsgCache
m.autoscalingOptions = newAutoscalingOptions
m.instanceStatus = newInstanceStatusMap
m.instanceLifecycle = newInstanceLifecycleMap
return nil
}

View File

@ -75,6 +75,7 @@ func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64
instances = append(instances, &autoscaling.Instance{
InstanceId: aws.String(id),
AvailabilityZone: aws.String("us-east-1a"),
LifecycleState: aws.String(autoscaling.LifecycleStateInService),
})
}
return &autoscaling.DescribeAutoScalingGroupsOutput{
@ -91,6 +92,15 @@ func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64
}
}
func testSetASGInstanceLifecycle(asg *autoscaling.DescribeAutoScalingGroupsOutput, lifecycleState string) *autoscaling.DescribeAutoScalingGroupsOutput {
for _, asg := range asg.AutoScalingGroups {
for _, instance := range asg.Instances {
instance.LifecycleState = aws.String(lifecycleState)
}
}
return asg
}
func testProvider(t *testing.T, m *AwsManager) *awsCloudProvider {
resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
@ -440,6 +450,54 @@ func TestDeleteNodes(t *testing.T) {
assert.Equal(t, 1, newSize)
}
func TestDeleteNodesTerminatingInstances(t *testing.T) {
a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}))
asgs := provider.NodeGroups()
a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String("test-instance-id"),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{
Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")},
})
// Look up the current number of instances...
var expectedInstancesCount int64 = 2
a.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testSetASGInstanceLifecycle(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "test-instance-id", "second-test-instance-id"), autoscaling.LifecycleStateTerminatingWait), false)
// we expect the instance count to be 1 after the call to DeleteNodes
expectedInstancesCount = 1
}).Return(nil)
provider.Refresh()
initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 2, initialSize)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "aws:///us-east-1a/test-instance-id",
},
}
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 0) // instances which are terminating don't need to be terminated again
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 2, newSize)
}
func TestDeleteNodesWithPlaceholder(t *testing.T) {
a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}))