diff --git a/cloudmock/aws/mocksqs/api.go b/cloudmock/aws/mocksqs/api.go index 9315dfb516..ae09f0e678 100644 --- a/cloudmock/aws/mocksqs/api.go +++ b/cloudmock/aws/mocksqs/api.go @@ -17,17 +17,17 @@ limitations under the License. package mocksqs import ( + "context" "fmt" "sync" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "k8s.io/kops/util/pkg/awsinterfaces" ) type MockSQS struct { - sqsiface.SQSAPI + awsinterfaces.SQSAPI mutex sync.Mutex Queues map[string]mockQueue @@ -39,9 +39,9 @@ type mockQueue struct { tags map[string]*string } -var _ sqsiface.SQSAPI = &MockSQS{} +var _ awsinterfaces.SQSAPI = &MockSQS{} -func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) { +func (m *MockSQS) CreateQueue(ctx context.Context, input *sqs.CreateQueueInput, optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -53,11 +53,11 @@ func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutp } queue := mockQueue{ url: &url, - attributes: input.Attributes, - tags: input.Tags, + attributes: aws.StringMap(input.Attributes), + tags: aws.StringMap(input.Tags), } - arn := fmt.Sprintf("arn:aws-test:sqs:us-test-1:000000000000:queue/%v", aws.StringValue(input.QueueName)) + arn := fmt.Sprintf("arn:aws-test:sqs:us-test-1:000000000000:queue/%v", aws.ToString(input.QueueName)) queue.attributes["QueueArn"] = &arn m.Queues[name] = queue @@ -68,19 +68,19 @@ func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutp return response, nil } -func (m *MockSQS) ListQueues(input *sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error) { +func (m *MockSQS) ListQueues(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) { m.mutex.Lock() defer m.mutex.Unlock() response := &sqs.ListQueuesOutput{} if queue, ok := m.Queues[*input.QueueNamePrefix]; ok { - response.QueueUrls = []*string{queue.url} + response.QueueUrls = []string{aws.ToString(queue.url)} } return response, nil } -func (m *MockSQS) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) { +func (m *MockSQS) GetQueueAttributes(ctx context.Context, input *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -88,14 +88,14 @@ func (m *MockSQS) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G for _, v := range m.Queues { if *v.url == *input.QueueUrl { - response.Attributes = v.attributes + response.Attributes = aws.ToStringMap(v.attributes) return response, nil } } return response, nil } -func (m *MockSQS) ListQueueTags(input *sqs.ListQueueTagsInput) (*sqs.ListQueueTagsOutput, error) { +func (m *MockSQS) ListQueueTags(ctx context.Context, input *sqs.ListQueueTagsInput, optFns ...func(*sqs.Options)) (*sqs.ListQueueTagsOutput, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -103,13 +103,13 @@ func (m *MockSQS) ListQueueTags(input *sqs.ListQueueTagsInput) (*sqs.ListQueueTa for _, v := range m.Queues { if *v.url == *input.QueueUrl { - response.Tags = v.tags + response.Tags = aws.ToStringMap(v.tags) return response, nil } } return response, nil } -func (m *MockSQS) DeleteQueueWithContext(aws.Context, *sqs.DeleteQueueInput, ...request.Option) (*sqs.DeleteQueueOutput, error) { +func (m *MockSQS) DeleteQueue(ctx context.Context, input *sqs.DeleteQueueInput, optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error) { panic("Not implemented") } diff --git a/pkg/resources/aws/sqs.go b/pkg/resources/aws/sqs.go index 3601360303..1c1006d3dc 100644 --- a/pkg/resources/aws/sqs.go +++ b/pkg/resources/aws/sqs.go @@ -21,7 +21,7 @@ import ( "fmt" "strings" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs" "k8s.io/klog/v2" "k8s.io/kops/pkg/resources" "k8s.io/kops/upup/pkg/fi" @@ -49,7 +49,7 @@ func DeleteSQSQueue(cloud fi.Cloud, r *resources.Resource) error { request := &sqs.DeleteQueueInput{ QueueUrl: &url, } - if _, err := c.SQS().DeleteQueueWithContext(ctx, request); err != nil { + if _, err := c.SQS().DeleteQueue(ctx, request); err != nil { if awsup.AWSErrorCode(err) == "AWS.SimpleQueueService.NonExistentQueue" { // Concurrently deleted return nil @@ -68,7 +68,7 @@ func ListSQSQueues(cloud fi.Cloud, vpcID, clusterName string) ([]*resources.Reso request := &sqs.ListQueuesInput{ QueueNamePrefix: &queuePrefix, } - response, err := c.SQS().ListQueues(request) + response, err := c.SQS().ListQueues(context.TODO(), request) if err != nil { return nil, fmt.Errorf("error listing SQS queues: %v", err) } @@ -80,8 +80,8 @@ func ListSQSQueues(cloud fi.Cloud, vpcID, clusterName string) ([]*resources.Reso for _, queueUrl := range response.QueueUrls { resourceTracker := &resources.Resource{ - Name: *queueUrl, - ID: *queueUrl, + Name: queueUrl, + ID: queueUrl, Type: "sqs", Deleter: DeleteSQSQueue, Dumper: DumpSQSQueue, diff --git a/upup/pkg/fi/cloudup/awstasks/sqs.go b/upup/pkg/fi/cloudup/awstasks/sqs.go index 34f70dba5b..cda9d41f67 100644 --- a/upup/pkg/fi/cloudup/awstasks/sqs.go +++ b/upup/pkg/fi/cloudup/awstasks/sqs.go @@ -17,6 +17,7 @@ limitations under the License. package awstasks import ( + "context" "encoding/json" "fmt" "reflect" @@ -26,8 +27,9 @@ import ( "k8s.io/klog/v2" "k8s.io/kops/upup/pkg/fi/cloudup/terraformWriter" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" "k8s.io/kops/upup/pkg/fi/cloudup/terraform" @@ -53,14 +55,15 @@ func (q *SQS) CompareWithID() *string { } func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) { + ctx := c.Context() cloud := c.T.Cloud.(awsup.AWSCloud) if q.Name == nil { return nil, nil } - response, err := cloud.SQS().ListQueues(&sqs.ListQueuesInput{ - MaxResults: aws.Int64(2), + response, err := cloud.SQS().ListQueues(ctx, &sqs.ListQueuesInput{ + MaxResults: aws.Int32(2), QueueNamePrefix: q.Name, }) if err != nil { @@ -74,22 +77,22 @@ func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) { } url := response.QueueUrls[0] - attributes, err := cloud.SQS().GetQueueAttributes(&sqs.GetQueueAttributesInput{ - AttributeNames: []*string{s("MessageRetentionPeriod"), s("Policy"), s("QueueArn")}, - QueueUrl: url, + attributes, err := cloud.SQS().GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + AttributeNames: []sqstypes.QueueAttributeName{sqstypes.QueueAttributeNameMessageRetentionPeriod, sqstypes.QueueAttributeNamePolicy, sqstypes.QueueAttributeNameQueueArn}, + QueueUrl: aws.String(url), }) if err != nil { return nil, fmt.Errorf("error getting SQS queue attributes: %v", err) } - actualPolicy := *attributes.Attributes["Policy"] - actualARN := *attributes.Attributes["QueueArn"] - period, err := strconv.Atoi(*attributes.Attributes["MessageRetentionPeriod"]) + actualPolicy := attributes.Attributes["Policy"] + actualARN := attributes.Attributes["QueueArn"] + period, err := strconv.Atoi(attributes.Attributes["MessageRetentionPeriod"]) if err != nil { return nil, fmt.Errorf("error coverting MessageRetentionPeriod to int: %v", err) } - tags, err := cloud.SQS().ListQueueTags(&sqs.ListQueueTagsInput{ - QueueUrl: url, + tags, err := cloud.SQS().ListQueueTags(ctx, &sqs.ListQueueTagsInput{ + QueueUrl: aws.String(url), }) if err != nil { return nil, fmt.Errorf("error listing SQS queue tags: %v", err) @@ -99,17 +102,17 @@ func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) { if q.Policy != nil { expectedPolicy, err := fi.ResourceAsString(q.Policy) if err != nil { - return nil, fmt.Errorf("error reading expected Policy for SQS %q: %v", aws.StringValue(q.Name), err) + return nil, fmt.Errorf("error reading expected Policy for SQS %q: %v", aws.ToString(q.Name), err) } expectedJson := make(map[string]interface{}) err = json.Unmarshal([]byte(expectedPolicy), &expectedJson) if err != nil { - return nil, fmt.Errorf("error parsing expected Policy for SQS %q: %v", aws.StringValue(q.Name), err) + return nil, fmt.Errorf("error parsing expected Policy for SQS %q: %v", aws.ToString(q.Name), err) } actualJson := make(map[string]interface{}) err = json.Unmarshal([]byte(actualPolicy), &actualJson) if err != nil { - return nil, fmt.Errorf("error parsing actual Policy for SQS %q: %v", aws.StringValue(q.Name), err) + return nil, fmt.Errorf("error parsing actual Policy for SQS %q: %v", aws.ToString(q.Name), err) } if reflect.DeepEqual(actualJson, expectedJson) { @@ -123,7 +126,7 @@ func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) { actual := &SQS{ ARN: s(actualARN), Name: q.Name, - URL: url, + URL: aws.String(url), Lifecycle: q.Lifecycle, Policy: fi.NewStringResource(actualPolicy), MessageRetentionPeriod: period, @@ -155,6 +158,7 @@ func (q *SQS) CheckChanges(a, e, changes *SQS) error { } func (q *SQS) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *SQS) error { + ctx := context.TODO() policy, err := fi.ResourceAsString(e.Policy) if err != nil { return fmt.Errorf("error rendering RolePolicyDocument: %v", err) @@ -162,27 +166,27 @@ func (q *SQS) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *SQS) error { if a == nil { request := &sqs.CreateQueueInput{ - Attributes: map[string]*string{ - "MessageRetentionPeriod": s(strconv.Itoa(q.MessageRetentionPeriod)), - "Policy": s(policy), + Attributes: map[string]string{ + "MessageRetentionPeriod": strconv.Itoa(q.MessageRetentionPeriod), + "Policy": policy, }, QueueName: q.Name, - Tags: convertTagsToPointers(q.Tags), + Tags: q.Tags, } - response, err := t.Cloud.SQS().CreateQueue(request) + response, err := t.Cloud.SQS().CreateQueue(ctx, request) if err != nil { return fmt.Errorf("error creating SQS queue: %v", err) } - attributes, err := t.Cloud.SQS().GetQueueAttributes(&sqs.GetQueueAttributesInput{ - AttributeNames: []*string{s("QueueArn")}, + attributes, err := t.Cloud.SQS().GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + AttributeNames: []sqstypes.QueueAttributeName{sqstypes.QueueAttributeNameQueueArn}, QueueUrl: response.QueueUrl, }) if err != nil { return fmt.Errorf("error getting SQS queue attributes: %v", err) } - e.ARN = attributes.Attributes["QueueArn"] + e.ARN = aws.String(attributes.Attributes["QueueArn"]) } return nil @@ -215,28 +219,15 @@ func (e *SQS) TerraformLink() *terraformWriter.Literal { return terraformWriter.LiteralProperty("aws_sqs_queue", *e.Name, "arn") } -// change tags to format required by CreateQueue -func convertTagsToPointers(tags map[string]string) map[string]*string { - newTags := map[string]*string{} - for k, v := range tags { - vv := v - newTags[k] = &vv - } - - return newTags -} - // intersectSQSTags does the same thing as intersectTags, but takes different input because SQS tags are listed differently -func intersectSQSTags(tags map[string]*string, desired map[string]string) map[string]string { +func intersectSQSTags(tags map[string]string, desired map[string]string) map[string]string { if tags == nil { return nil } actual := make(map[string]string) for k, v := range tags { - vv := aws.StringValue(v) - if _, found := desired[k]; found { - actual[k] = vv + actual[k] = v } } if len(actual) == 0 && desired == nil { diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index b53e6de177..13be9f429f 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -28,8 +28,7 @@ import ( awsv2 "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/eventbridge" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/aws/aws-sdk-go-v2/service/sqs" "golang.org/x/sync/errgroup" "github.com/aws/aws-sdk-go-v2/aws/arn" @@ -136,7 +135,7 @@ type AWSCloud interface { Autoscaling() autoscalingiface.AutoScalingAPI Route53() route53iface.Route53API Spotinst() spotinst.Cloud - SQS() sqsiface.SQSAPI + SQS() awsinterfaces.SQSAPI EventBridge() awsinterfaces.EventBridgeAPI SSM() ssmiface.SSMAPI @@ -206,7 +205,7 @@ type awsCloudImplementation struct { route53 *route53.Route53 spotinst spotinst.Cloud sts *sts.STS - sqs *sqs.SQS + sqs *sqs.Client eventbridge *eventbridge.Client ssm *ssm.SSM @@ -408,17 +407,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } } - sess, err = session.NewSessionWithOptions(session.Options{ - Config: *config, - SharedConfigState: session.SharedConfigEnable, - }) - if err != nil { - return c, err - } - c.sqs = sqs.New(sess, config) - c.sqs.Handlers.Send.PushFront(requestLogger) - c.addHandlers(region, &c.sqs.Handlers) - + c.sqs = sqs.NewFromConfig(cfgV2) c.eventbridge = eventbridge.NewFromConfig(cfgV2) sess, err = session.NewSessionWithOptions(session.Options{ @@ -2233,7 +2222,7 @@ func (c *awsCloudImplementation) Spotinst() spotinst.Cloud { return c.spotinst } -func (c *awsCloudImplementation) SQS() sqsiface.SQSAPI { +func (c *awsCloudImplementation) SQS() awsinterfaces.SQSAPI { return c.sqs } diff --git a/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go b/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go index 0477a7479e..e1ebcf6dd8 100644 --- a/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go @@ -32,7 +32,6 @@ import ( "github.com/aws/aws-sdk-go/service/elbv2/elbv2iface" "github.com/aws/aws-sdk-go/service/iam/iamiface" "github.com/aws/aws-sdk-go/service/route53/route53iface" - "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -86,7 +85,7 @@ type MockCloud struct { MockELB elbiface.ELBAPI MockELBV2 elbv2iface.ELBV2API MockSpotinst spotinst.Cloud - MockSQS sqsiface.SQSAPI + MockSQS awsinterfaces.SQSAPI MockEventBridge awsinterfaces.EventBridgeAPI MockSSM ssmiface.SSMAPI } @@ -288,7 +287,7 @@ func (c *MockAWSCloud) Spotinst() spotinst.Cloud { return c.MockSpotinst } -func (c *MockAWSCloud) SQS() sqsiface.SQSAPI { +func (c *MockAWSCloud) SQS() awsinterfaces.SQSAPI { if c.MockSQS == nil { klog.Fatalf("MockSQS not set") } diff --git a/util/pkg/awsinterfaces/sqs.go b/util/pkg/awsinterfaces/sqs.go new file mode 100644 index 0000000000..f2e6d18603 --- /dev/null +++ b/util/pkg/awsinterfaces/sqs.go @@ -0,0 +1,31 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package awsinterfaces + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +type SQSAPI interface { + ListQueues(ctx context.Context, params *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) + GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error) + ListQueueTags(ctx context.Context, params *sqs.ListQueueTagsInput, optFns ...func(*sqs.Options)) (*sqs.ListQueueTagsOutput, error) + CreateQueue(ctx context.Context, params *sqs.CreateQueueInput, optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error) + DeleteQueue(ctx context.Context, params *sqs.DeleteQueueInput, optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error) +}