From 43c511dc25b799dea6c2612d81ccbb6d8c008613 Mon Sep 17 00:00:00 2001 From: Sam Lockart Date: Wed, 11 Jan 2023 16:44:10 +1100 Subject: [PATCH 1/2] Track lifecycle and skip terminating already terminating instances Avoid deadlocking by locking mutex twice Instantiate test instance with lifecycle state Make method private --- .../cloudprovider/aws/auto_scaling_groups.go | 30 ++++++++++++++++++- .../aws/aws_cloud_provider_test.go | 1 + 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 242d29f69a..5ffe9f1424 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -40,6 +40,7 @@ type asgCache struct { asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg instanceStatus map[AwsInstanceRef]*string + instanceLifecycle map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -83,6 +84,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), instanceStatus: make(map[AwsInstanceRef]*string), + instanceLifecycle: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -232,6 +234,14 @@ func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { return nil, fmt.Errorf("could not find instance %v", ref) } +func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) { + if lifecycle, found := m.instanceLifecycle[ref]; found { + return lifecycle, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -285,7 +295,6 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { for i, instance := range instances { instanceIds[i] = instance.Name } - return fmt.Errorf("can't delete instances %s as they belong to at least two different ASGs (%s and %s)", strings.Join(instanceIds, ","), commonAsg.Name, asg.Name) } } @@ -298,6 +307,22 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { "of deleting instance", instance.Name) m.decreaseAsgSizeByOneNoLock(commonAsg) } else { + // check if the instance is already terminating - if it is, don't bother terminating again + // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement + // unnecessarily. + lifecycle, err := m.findInstanceLifecycle(*instance) + if err != nil { + return err + } + + if lifecycle != nil && + *lifecycle == autoscaling.LifecycleStateTerminating || + *lifecycle == autoscaling.LifecycleStateTerminatingWait || + *lifecycle == autoscaling.LifecycleStateTerminatingProceed { + klog.V(2).Infof("instance %s is already terminating, will skip instead", instance.Name) + continue + } + params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ InstanceId: aws.String(instance.Name), ShouldDecrementDesiredCapacity: aws.Bool(true), @@ -354,6 +379,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) newInstanceStatusMap := make(map[AwsInstanceRef]*string) + newInstanceLifecycleMap := make(map[AwsInstanceRef]*string) // Fetch details of all ASGs refreshNames := m.buildAsgNames() @@ -395,6 +421,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref newInstanceStatusMap[ref] = instance.HealthStatus + newInstanceLifecycleMap[ref] = instance.LifecycleState } } @@ -424,6 +451,7 @@ func (m *asgCache) regenerate() error { m.instanceToAsg = newInstanceToAsgCache m.autoscalingOptions = newAutoscalingOptions m.instanceStatus = newInstanceStatusMap + m.instanceLifecycle = newInstanceLifecycleMap return nil } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 78a4f46cb8..854c023f8f 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -75,6 +75,7 @@ func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64 instances = append(instances, &autoscaling.Instance{ InstanceId: aws.String(id), AvailabilityZone: aws.String("us-east-1a"), + LifecycleState: aws.String(autoscaling.LifecycleStateInService), }) } return &autoscaling.DescribeAutoScalingGroupsOutput{ From 21e1f5ba30dd3a2446e4fbb19745e98e880e2c59 Mon Sep 17 00:00:00 2001 From: Sam Lockart Date: Thu, 12 Jan 2023 13:51:24 +1100 Subject: [PATCH 2/2] Add test to assert terminating instances aren't terminated again --- .../aws/aws_cloud_provider_test.go | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 854c023f8f..a0324fcbb1 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -92,6 +92,15 @@ func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64 } } +func testSetASGInstanceLifecycle(asg *autoscaling.DescribeAutoScalingGroupsOutput, lifecycleState string) *autoscaling.DescribeAutoScalingGroupsOutput { + for _, asg := range asg.AutoScalingGroups { + for _, instance := range asg.Instances { + instance.LifecycleState = aws.String(lifecycleState) + } + } + return asg +} + func testProvider(t *testing.T, m *AwsManager) *awsCloudProvider { resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, @@ -441,6 +450,54 @@ func TestDeleteNodes(t *testing.T) { assert.Equal(t, 1, newSize) } +func TestDeleteNodesTerminatingInstances(t *testing.T) { + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) + asgs := provider.NodeGroups() + + a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String("test-instance-id"), + ShouldDecrementDesiredCapacity: aws.Bool(true), + }).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{ + Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")}, + }) + + // Look up the current number of instances... + var expectedInstancesCount int64 = 2 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testSetASGInstanceLifecycle(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "test-instance-id", "second-test-instance-id"), autoscaling.LifecycleStateTerminatingWait), false) + // we expect the instance count to be 1 after the call to DeleteNodes + expectedInstancesCount = 1 + }).Return(nil) + + provider.Refresh() + + initialSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 2, initialSize) + + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "aws:///us-east-1a/test-instance-id", + }, + } + err = asgs[0].DeleteNodes([]*apiv1.Node{node}) + assert.NoError(t, err) + a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 0) // instances which are terminating don't need to be terminated again + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + + newSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 2, newSize) +} + func TestDeleteNodesWithPlaceholder(t *testing.T) { a := &autoScalingMock{} provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}))