diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 36eb4d9a1a..a2a83fc52b 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -40,6 +40,7 @@ type asgCache struct { asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg instanceStatus map[AwsInstanceRef]*string + instanceLifecycle map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -83,6 +84,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), instanceStatus: make(map[AwsInstanceRef]*string), + instanceLifecycle: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -239,6 +241,14 @@ func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { return nil, fmt.Errorf("could not find instance %v", ref) } +func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) { + if lifecycle, found := m.instanceLifecycle[ref]; found { + return lifecycle, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -292,7 +302,6 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { for i, instance := range instances { instanceIds[i] = instance.Name } - return fmt.Errorf("can't delete instances %s as they belong to at least two different ASGs (%s and %s)", strings.Join(instanceIds, ","), commonAsg.Name, asg.Name) } } @@ -305,6 +314,22 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { "of deleting instance", instance.Name) m.decreaseAsgSizeByOneNoLock(commonAsg) } else { + // check if the instance is already terminating - if it is, don't bother terminating again + // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement + // unnecessarily. + lifecycle, err := m.findInstanceLifecycle(*instance) + if err != nil { + return err + } + + if lifecycle != nil && + *lifecycle == autoscaling.LifecycleStateTerminating || + *lifecycle == autoscaling.LifecycleStateTerminatingWait || + *lifecycle == autoscaling.LifecycleStateTerminatingProceed { + klog.V(2).Infof("instance %s is already terminating, will skip instead", instance.Name) + continue + } + params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ InstanceId: aws.String(instance.Name), ShouldDecrementDesiredCapacity: aws.Bool(true), @@ -361,6 +386,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) newInstanceStatusMap := make(map[AwsInstanceRef]*string) + newInstanceLifecycleMap := make(map[AwsInstanceRef]*string) // Fetch details of all ASGs refreshNames := m.buildAsgNames() @@ -402,6 +428,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref newInstanceStatusMap[ref] = instance.HealthStatus + newInstanceLifecycleMap[ref] = instance.LifecycleState } } @@ -431,6 +458,7 @@ func (m *asgCache) regenerate() error { m.instanceToAsg = newInstanceToAsgCache m.autoscalingOptions = newAutoscalingOptions m.instanceStatus = newInstanceStatusMap + m.instanceLifecycle = newInstanceLifecycleMap return nil } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 78a4f46cb8..a0324fcbb1 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -75,6 +75,7 @@ func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64 instances = append(instances, &autoscaling.Instance{ InstanceId: aws.String(id), AvailabilityZone: aws.String("us-east-1a"), + LifecycleState: aws.String(autoscaling.LifecycleStateInService), }) } return &autoscaling.DescribeAutoScalingGroupsOutput{ @@ -91,6 +92,15 @@ func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64 } } +func testSetASGInstanceLifecycle(asg *autoscaling.DescribeAutoScalingGroupsOutput, lifecycleState string) *autoscaling.DescribeAutoScalingGroupsOutput { + for _, asg := range asg.AutoScalingGroups { + for _, instance := range asg.Instances { + instance.LifecycleState = aws.String(lifecycleState) + } + } + return asg +} + func testProvider(t *testing.T, m *AwsManager) *awsCloudProvider { resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, @@ -440,6 +450,54 @@ func TestDeleteNodes(t *testing.T) { assert.Equal(t, 1, newSize) } +func TestDeleteNodesTerminatingInstances(t *testing.T) { + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) + asgs := provider.NodeGroups() + + a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String("test-instance-id"), + ShouldDecrementDesiredCapacity: aws.Bool(true), + }).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{ + Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")}, + }) + + // Look up the current number of instances... + var expectedInstancesCount int64 = 2 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testSetASGInstanceLifecycle(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "test-instance-id", "second-test-instance-id"), autoscaling.LifecycleStateTerminatingWait), false) + // we expect the instance count to be 1 after the call to DeleteNodes + expectedInstancesCount = 1 + }).Return(nil) + + provider.Refresh() + + initialSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 2, initialSize) + + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "aws:///us-east-1a/test-instance-id", + }, + } + err = asgs[0].DeleteNodes([]*apiv1.Node{node}) + assert.NoError(t, err) + a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 0) // instances which are terminating don't need to be terminated again + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + + newSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 2, newSize) +} + func TestDeleteNodesWithPlaceholder(t *testing.T) { a := &autoScalingMock{} provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}))