Migrate SQS to aws-sdk-go-v2

This commit is contained in:
Peter Rifel 2024-03-29 18:47:22 -05:00
parent 1c0c5a5730
commit e7a8b65c29
No known key found for this signature in database
6 changed files with 90 additions and 80 deletions

View File

@ -17,17 +17,17 @@ limitations under the License.
package mocksqs
import (
"context"
"fmt"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"k8s.io/kops/util/pkg/awsinterfaces"
)
type MockSQS struct {
sqsiface.SQSAPI
awsinterfaces.SQSAPI
mutex sync.Mutex
Queues map[string]mockQueue
@ -39,9 +39,9 @@ type mockQueue struct {
tags map[string]*string
}
var _ sqsiface.SQSAPI = &MockSQS{}
var _ awsinterfaces.SQSAPI = &MockSQS{}
func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) {
func (m *MockSQS) CreateQueue(ctx context.Context, input *sqs.CreateQueueInput, optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -53,11 +53,11 @@ func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutp
}
queue := mockQueue{
url: &url,
attributes: input.Attributes,
tags: input.Tags,
attributes: aws.StringMap(input.Attributes),
tags: aws.StringMap(input.Tags),
}
arn := fmt.Sprintf("arn:aws-test:sqs:us-test-1:000000000000:queue/%v", aws.StringValue(input.QueueName))
arn := fmt.Sprintf("arn:aws-test:sqs:us-test-1:000000000000:queue/%v", aws.ToString(input.QueueName))
queue.attributes["QueueArn"] = &arn
m.Queues[name] = queue
@ -68,19 +68,19 @@ func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutp
return response, nil
}
func (m *MockSQS) ListQueues(input *sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error) {
func (m *MockSQS) ListQueues(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
response := &sqs.ListQueuesOutput{}
if queue, ok := m.Queues[*input.QueueNamePrefix]; ok {
response.QueueUrls = []*string{queue.url}
response.QueueUrls = []string{aws.ToString(queue.url)}
}
return response, nil
}
func (m *MockSQS) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) {
func (m *MockSQS) GetQueueAttributes(ctx context.Context, input *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -88,14 +88,14 @@ func (m *MockSQS) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G
for _, v := range m.Queues {
if *v.url == *input.QueueUrl {
response.Attributes = v.attributes
response.Attributes = aws.ToStringMap(v.attributes)
return response, nil
}
}
return response, nil
}
func (m *MockSQS) ListQueueTags(input *sqs.ListQueueTagsInput) (*sqs.ListQueueTagsOutput, error) {
func (m *MockSQS) ListQueueTags(ctx context.Context, input *sqs.ListQueueTagsInput, optFns ...func(*sqs.Options)) (*sqs.ListQueueTagsOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -103,13 +103,13 @@ func (m *MockSQS) ListQueueTags(input *sqs.ListQueueTagsInput) (*sqs.ListQueueTa
for _, v := range m.Queues {
if *v.url == *input.QueueUrl {
response.Tags = v.tags
response.Tags = aws.ToStringMap(v.tags)
return response, nil
}
}
return response, nil
}
func (m *MockSQS) DeleteQueueWithContext(aws.Context, *sqs.DeleteQueueInput, ...request.Option) (*sqs.DeleteQueueOutput, error) {
func (m *MockSQS) DeleteQueue(ctx context.Context, input *sqs.DeleteQueueInput, optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error) {
panic("Not implemented")
}

View File

@ -21,7 +21,7 @@ import (
"fmt"
"strings"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/resources"
"k8s.io/kops/upup/pkg/fi"
@ -49,7 +49,7 @@ func DeleteSQSQueue(cloud fi.Cloud, r *resources.Resource) error {
request := &sqs.DeleteQueueInput{
QueueUrl: &url,
}
if _, err := c.SQS().DeleteQueueWithContext(ctx, request); err != nil {
if _, err := c.SQS().DeleteQueue(ctx, request); err != nil {
if awsup.AWSErrorCode(err) == "AWS.SimpleQueueService.NonExistentQueue" {
// Concurrently deleted
return nil
@ -68,7 +68,7 @@ func ListSQSQueues(cloud fi.Cloud, vpcID, clusterName string) ([]*resources.Reso
request := &sqs.ListQueuesInput{
QueueNamePrefix: &queuePrefix,
}
response, err := c.SQS().ListQueues(request)
response, err := c.SQS().ListQueues(context.TODO(), request)
if err != nil {
return nil, fmt.Errorf("error listing SQS queues: %v", err)
}
@ -80,8 +80,8 @@ func ListSQSQueues(cloud fi.Cloud, vpcID, clusterName string) ([]*resources.Reso
for _, queueUrl := range response.QueueUrls {
resourceTracker := &resources.Resource{
Name: *queueUrl,
ID: *queueUrl,
Name: queueUrl,
ID: queueUrl,
Type: "sqs",
Deleter: DeleteSQSQueue,
Dumper: DumpSQSQueue,

View File

@ -17,6 +17,7 @@ limitations under the License.
package awstasks
import (
"context"
"encoding/json"
"fmt"
"reflect"
@ -26,8 +27,9 @@ import (
"k8s.io/klog/v2"
"k8s.io/kops/upup/pkg/fi/cloudup/terraformWriter"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"k8s.io/kops/upup/pkg/fi/cloudup/terraform"
@ -53,14 +55,15 @@ func (q *SQS) CompareWithID() *string {
}
func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) {
ctx := c.Context()
cloud := c.T.Cloud.(awsup.AWSCloud)
if q.Name == nil {
return nil, nil
}
response, err := cloud.SQS().ListQueues(&sqs.ListQueuesInput{
MaxResults: aws.Int64(2),
response, err := cloud.SQS().ListQueues(ctx, &sqs.ListQueuesInput{
MaxResults: aws.Int32(2),
QueueNamePrefix: q.Name,
})
if err != nil {
@ -74,22 +77,22 @@ func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) {
}
url := response.QueueUrls[0]
attributes, err := cloud.SQS().GetQueueAttributes(&sqs.GetQueueAttributesInput{
AttributeNames: []*string{s("MessageRetentionPeriod"), s("Policy"), s("QueueArn")},
QueueUrl: url,
attributes, err := cloud.SQS().GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: []sqstypes.QueueAttributeName{sqstypes.QueueAttributeNameMessageRetentionPeriod, sqstypes.QueueAttributeNamePolicy, sqstypes.QueueAttributeNameQueueArn},
QueueUrl: aws.String(url),
})
if err != nil {
return nil, fmt.Errorf("error getting SQS queue attributes: %v", err)
}
actualPolicy := *attributes.Attributes["Policy"]
actualARN := *attributes.Attributes["QueueArn"]
period, err := strconv.Atoi(*attributes.Attributes["MessageRetentionPeriod"])
actualPolicy := attributes.Attributes["Policy"]
actualARN := attributes.Attributes["QueueArn"]
period, err := strconv.Atoi(attributes.Attributes["MessageRetentionPeriod"])
if err != nil {
return nil, fmt.Errorf("error coverting MessageRetentionPeriod to int: %v", err)
}
tags, err := cloud.SQS().ListQueueTags(&sqs.ListQueueTagsInput{
QueueUrl: url,
tags, err := cloud.SQS().ListQueueTags(ctx, &sqs.ListQueueTagsInput{
QueueUrl: aws.String(url),
})
if err != nil {
return nil, fmt.Errorf("error listing SQS queue tags: %v", err)
@ -99,17 +102,17 @@ func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) {
if q.Policy != nil {
expectedPolicy, err := fi.ResourceAsString(q.Policy)
if err != nil {
return nil, fmt.Errorf("error reading expected Policy for SQS %q: %v", aws.StringValue(q.Name), err)
return nil, fmt.Errorf("error reading expected Policy for SQS %q: %v", aws.ToString(q.Name), err)
}
expectedJson := make(map[string]interface{})
err = json.Unmarshal([]byte(expectedPolicy), &expectedJson)
if err != nil {
return nil, fmt.Errorf("error parsing expected Policy for SQS %q: %v", aws.StringValue(q.Name), err)
return nil, fmt.Errorf("error parsing expected Policy for SQS %q: %v", aws.ToString(q.Name), err)
}
actualJson := make(map[string]interface{})
err = json.Unmarshal([]byte(actualPolicy), &actualJson)
if err != nil {
return nil, fmt.Errorf("error parsing actual Policy for SQS %q: %v", aws.StringValue(q.Name), err)
return nil, fmt.Errorf("error parsing actual Policy for SQS %q: %v", aws.ToString(q.Name), err)
}
if reflect.DeepEqual(actualJson, expectedJson) {
@ -123,7 +126,7 @@ func (q *SQS) Find(c *fi.CloudupContext) (*SQS, error) {
actual := &SQS{
ARN: s(actualARN),
Name: q.Name,
URL: url,
URL: aws.String(url),
Lifecycle: q.Lifecycle,
Policy: fi.NewStringResource(actualPolicy),
MessageRetentionPeriod: period,
@ -155,6 +158,7 @@ func (q *SQS) CheckChanges(a, e, changes *SQS) error {
}
func (q *SQS) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *SQS) error {
ctx := context.TODO()
policy, err := fi.ResourceAsString(e.Policy)
if err != nil {
return fmt.Errorf("error rendering RolePolicyDocument: %v", err)
@ -162,27 +166,27 @@ func (q *SQS) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *SQS) error {
if a == nil {
request := &sqs.CreateQueueInput{
Attributes: map[string]*string{
"MessageRetentionPeriod": s(strconv.Itoa(q.MessageRetentionPeriod)),
"Policy": s(policy),
Attributes: map[string]string{
"MessageRetentionPeriod": strconv.Itoa(q.MessageRetentionPeriod),
"Policy": policy,
},
QueueName: q.Name,
Tags: convertTagsToPointers(q.Tags),
Tags: q.Tags,
}
response, err := t.Cloud.SQS().CreateQueue(request)
response, err := t.Cloud.SQS().CreateQueue(ctx, request)
if err != nil {
return fmt.Errorf("error creating SQS queue: %v", err)
}
attributes, err := t.Cloud.SQS().GetQueueAttributes(&sqs.GetQueueAttributesInput{
AttributeNames: []*string{s("QueueArn")},
attributes, err := t.Cloud.SQS().GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: []sqstypes.QueueAttributeName{sqstypes.QueueAttributeNameQueueArn},
QueueUrl: response.QueueUrl,
})
if err != nil {
return fmt.Errorf("error getting SQS queue attributes: %v", err)
}
e.ARN = attributes.Attributes["QueueArn"]
e.ARN = aws.String(attributes.Attributes["QueueArn"])
}
return nil
@ -215,28 +219,15 @@ func (e *SQS) TerraformLink() *terraformWriter.Literal {
return terraformWriter.LiteralProperty("aws_sqs_queue", *e.Name, "arn")
}
// change tags to format required by CreateQueue
func convertTagsToPointers(tags map[string]string) map[string]*string {
newTags := map[string]*string{}
for k, v := range tags {
vv := v
newTags[k] = &vv
}
return newTags
}
// intersectSQSTags does the same thing as intersectTags, but takes different input because SQS tags are listed differently
func intersectSQSTags(tags map[string]*string, desired map[string]string) map[string]string {
func intersectSQSTags(tags map[string]string, desired map[string]string) map[string]string {
if tags == nil {
return nil
}
actual := make(map[string]string)
for k, v := range tags {
vv := aws.StringValue(v)
if _, found := desired[k]; found {
actual[k] = vv
actual[k] = v
}
}
if len(actual) == 0 && desired == nil {

View File

@ -28,8 +28,7 @@ import (
awsv2 "github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/eventbridge"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"golang.org/x/sync/errgroup"
"github.com/aws/aws-sdk-go-v2/aws/arn"
@ -136,7 +135,7 @@ type AWSCloud interface {
Autoscaling() autoscalingiface.AutoScalingAPI
Route53() route53iface.Route53API
Spotinst() spotinst.Cloud
SQS() sqsiface.SQSAPI
SQS() awsinterfaces.SQSAPI
EventBridge() awsinterfaces.EventBridgeAPI
SSM() ssmiface.SSMAPI
@ -206,7 +205,7 @@ type awsCloudImplementation struct {
route53 *route53.Route53
spotinst spotinst.Cloud
sts *sts.STS
sqs *sqs.SQS
sqs *sqs.Client
eventbridge *eventbridge.Client
ssm *ssm.SSM
@ -408,17 +407,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
}
}
sess, err = session.NewSessionWithOptions(session.Options{
Config: *config,
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
return c, err
}
c.sqs = sqs.New(sess, config)
c.sqs.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.sqs.Handlers)
c.sqs = sqs.NewFromConfig(cfgV2)
c.eventbridge = eventbridge.NewFromConfig(cfgV2)
sess, err = session.NewSessionWithOptions(session.Options{
@ -2233,7 +2222,7 @@ func (c *awsCloudImplementation) Spotinst() spotinst.Cloud {
return c.spotinst
}
func (c *awsCloudImplementation) SQS() sqsiface.SQSAPI {
func (c *awsCloudImplementation) SQS() awsinterfaces.SQSAPI {
return c.sqs
}

View File

@ -32,7 +32,6 @@ import (
"github.com/aws/aws-sdk-go/service/elbv2/elbv2iface"
"github.com/aws/aws-sdk-go/service/iam/iamiface"
"github.com/aws/aws-sdk-go/service/route53/route53iface"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/aws/aws-sdk-go/service/ssm/ssmiface"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
@ -86,7 +85,7 @@ type MockCloud struct {
MockELB elbiface.ELBAPI
MockELBV2 elbv2iface.ELBV2API
MockSpotinst spotinst.Cloud
MockSQS sqsiface.SQSAPI
MockSQS awsinterfaces.SQSAPI
MockEventBridge awsinterfaces.EventBridgeAPI
MockSSM ssmiface.SSMAPI
}
@ -288,7 +287,7 @@ func (c *MockAWSCloud) Spotinst() spotinst.Cloud {
return c.MockSpotinst
}
func (c *MockAWSCloud) SQS() sqsiface.SQSAPI {
func (c *MockAWSCloud) SQS() awsinterfaces.SQSAPI {
if c.MockSQS == nil {
klog.Fatalf("MockSQS not set")
}

View File

@ -0,0 +1,31 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package awsinterfaces
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
type SQSAPI interface {
ListQueues(ctx context.Context, params *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error)
GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error)
ListQueueTags(ctx context.Context, params *sqs.ListQueueTagsInput, optFns ...func(*sqs.Options)) (*sqs.ListQueueTagsOutput, error)
CreateQueue(ctx context.Context, params *sqs.CreateQueueInput, optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error)
DeleteQueue(ctx context.Context, params *sqs.DeleteQueueInput, optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error)
}