mirror of https://github.com/kubernetes/kops.git
Merge pull request #12571 from rifelpet/sqs-arn
Use the SQS Queue's ARN reference
This commit is contained in:
commit
228c82cb6e
|
@ -6,6 +6,7 @@ go_library(
|
|||
importpath = "k8s.io/kops/cloudmock/aws/mocksqs",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/service/sqs:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/service/sqs/sqsiface:go_default_library",
|
||||
],
|
||||
|
|
|
@ -17,8 +17,10 @@ limitations under the License.
|
|||
package mocksqs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/sqs"
|
||||
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
|
||||
)
|
||||
|
@ -54,6 +56,9 @@ func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutp
|
|||
tags: input.Tags,
|
||||
}
|
||||
|
||||
arn := fmt.Sprintf("arn:aws:sqs:us-test-1:000000000000:queue/%v", aws.StringValue(input.QueueName))
|
||||
queue.attributes["QueueArn"] = &arn
|
||||
|
||||
m.Queues[name] = queue
|
||||
|
||||
response := &sqs.CreateQueueOutput{
|
||||
|
|
|
@ -87,12 +87,7 @@ func (b *NodeTerminationHandlerBuilder) Build(c *fi.ModelBuilderContext) error {
|
|||
}
|
||||
}
|
||||
|
||||
err := b.buildSQSQueue(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.buildEventBridgeRules(c)
|
||||
err := b.build(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -118,13 +113,13 @@ func (b *NodeTerminationHandlerBuilder) configureASG(c *fi.ModelBuilderContext,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *NodeTerminationHandlerBuilder) buildSQSQueue(c *fi.ModelBuilderContext) error {
|
||||
func (b *NodeTerminationHandlerBuilder) build(c *fi.ModelBuilderContext) error {
|
||||
queueName := model.QueueNamePrefix(b.ClusterName()) + "-nth"
|
||||
policy := strings.ReplaceAll(NTHTemplate, "{{ AWS_REGION }}", b.Region)
|
||||
policy = strings.ReplaceAll(policy, "{{ ACCOUNT_ID }}", b.AWSAccountID)
|
||||
policy = strings.ReplaceAll(policy, "{{ SQS_QUEUE_NAME }}", queueName)
|
||||
|
||||
task := &awstasks.SQS{
|
||||
queue := &awstasks.SQS{
|
||||
Name: aws.String(queueName),
|
||||
Lifecycle: b.Lifecycle,
|
||||
Policy: fi.NewStringResource(policy),
|
||||
|
@ -132,18 +127,9 @@ func (b *NodeTerminationHandlerBuilder) buildSQSQueue(c *fi.ModelBuilderContext)
|
|||
Tags: b.CloudTags(queueName, false),
|
||||
}
|
||||
|
||||
c.AddTask(task)
|
||||
c.AddTask(queue)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *NodeTerminationHandlerBuilder) buildEventBridgeRules(c *fi.ModelBuilderContext) error {
|
||||
clusterName := b.ClusterName()
|
||||
queueName := model.QueueNamePrefix(clusterName) + "-nth"
|
||||
partition := b.AWSPartition
|
||||
region := b.Region
|
||||
accountID := b.AWSAccountID
|
||||
targetArn := "arn:" + partition + ":sqs:" + region + ":" + accountID + ":" + queueName
|
||||
|
||||
clusterNamePrefix := awsup.GetClusterName40(clusterName)
|
||||
for _, event := range events {
|
||||
|
@ -157,7 +143,7 @@ func (b *NodeTerminationHandlerBuilder) buildEventBridgeRules(c *fi.ModelBuilder
|
|||
Tags: b.CloudTags(*ruleName, false),
|
||||
|
||||
EventPattern: &pattern,
|
||||
TargetArn: &targetArn,
|
||||
SQSQueue: queue,
|
||||
}
|
||||
|
||||
c.AddTask(ruleTask)
|
||||
|
@ -167,8 +153,8 @@ func (b *NodeTerminationHandlerBuilder) buildEventBridgeRules(c *fi.ModelBuilder
|
|||
Name: aws.String(*ruleName + "-Target"),
|
||||
Lifecycle: b.Lifecycle,
|
||||
|
||||
Rule: ruleTask,
|
||||
TargetArn: &targetArn,
|
||||
Rule: ruleTask,
|
||||
SQSQueue: queue,
|
||||
}
|
||||
|
||||
c.AddTask(targetTask)
|
||||
|
|
|
@ -984,7 +984,9 @@
|
|||
"Targets": [
|
||||
{
|
||||
"Id": "1",
|
||||
"Arn": "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
"Arn": {
|
||||
"Ref": "AWSSQSQueuenthsqsresourceslongclusternameexamplecomnth"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -1004,7 +1006,9 @@
|
|||
"Targets": [
|
||||
{
|
||||
"Id": "1",
|
||||
"Arn": "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
"Arn": {
|
||||
"Ref": "AWSSQSQueuenthsqsresourceslongclusternameexamplecomnth"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -1024,7 +1028,9 @@
|
|||
"Targets": [
|
||||
{
|
||||
"Id": "1",
|
||||
"Arn": "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
"Arn": {
|
||||
"Ref": "AWSSQSQueuenthsqsresourceslongclusternameexamplecomnth"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -1044,7 +1050,9 @@
|
|||
"Targets": [
|
||||
{
|
||||
"Id": "1",
|
||||
"Arn": "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
"Arn": {
|
||||
"Ref": "AWSSQSQueuenthsqsresourceslongclusternameexamplecomnth"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -265,22 +265,22 @@ resource "aws_cloudwatch_event_rule" "nthsqsresources-longclustername-e-fkbaoh-S
|
|||
}
|
||||
|
||||
resource "aws_cloudwatch_event_target" "nthsqsresources-longclustername-e-fkbaoh-ASGLifecycle-Target" {
|
||||
arn = "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
arn = aws_sqs_queue.nthsqsresources-longclustername-example-com-nth.arn
|
||||
rule = aws_cloudwatch_event_rule.nthsqsresources-longclustername-e-fkbaoh-ASGLifecycle.id
|
||||
}
|
||||
|
||||
resource "aws_cloudwatch_event_target" "nthsqsresources-longclustername-e-fkbaoh-InstanceStateChange-Target" {
|
||||
arn = "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
arn = aws_sqs_queue.nthsqsresources-longclustername-example-com-nth.arn
|
||||
rule = aws_cloudwatch_event_rule.nthsqsresources-longclustername-e-fkbaoh-InstanceStateChange.id
|
||||
}
|
||||
|
||||
resource "aws_cloudwatch_event_target" "nthsqsresources-longclustername-e-fkbaoh-RebalanceRecommendation-Target" {
|
||||
arn = "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
arn = aws_sqs_queue.nthsqsresources-longclustername-example-com-nth.arn
|
||||
rule = aws_cloudwatch_event_rule.nthsqsresources-longclustername-e-fkbaoh-RebalanceRecommendation.id
|
||||
}
|
||||
|
||||
resource "aws_cloudwatch_event_target" "nthsqsresources-longclustername-e-fkbaoh-SpotInterruption-Target" {
|
||||
arn = "arn:aws-test:sqs:us-test-1:123456789012:nthsqsresources-longclustername-example-com-nth"
|
||||
arn = aws_sqs_queue.nthsqsresources-longclustername-example-com-nth.arn
|
||||
rule = aws_cloudwatch_event_rule.nthsqsresources-longclustername-e-fkbaoh-SpotInterruption.id
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ type EventBridgeRule struct {
|
|||
Lifecycle fi.Lifecycle
|
||||
|
||||
EventPattern *string
|
||||
TargetArn *string // required for cloudformation rendering
|
||||
SQSQueue *SQS
|
||||
|
||||
Tags map[string]string
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ func (eb *EventBridgeRule) Find(c *fi.Context) (*EventBridgeRule, error) {
|
|||
Name: eb.Name,
|
||||
Lifecycle: eb.Lifecycle,
|
||||
EventPattern: rule.EventPattern,
|
||||
TargetArn: eb.TargetArn,
|
||||
SQSQueue: eb.SQSQueue,
|
||||
Tags: mapEventBridgeTagsToMap(tagResponse.Tags),
|
||||
}
|
||||
return actual, nil
|
||||
|
@ -153,7 +153,7 @@ func (eb *EventBridgeRule) TerraformLink() *terraformWriter.Literal {
|
|||
|
||||
type cloudformationTarget struct {
|
||||
Id *string
|
||||
Arn *string
|
||||
Arn *cloudformation.Literal
|
||||
}
|
||||
|
||||
type cloudformationEventBridgeRule struct {
|
||||
|
@ -176,7 +176,7 @@ func (_ *EventBridgeRule) RenderCloudformation(t *cloudformation.CloudformationT
|
|||
|
||||
target := &cloudformationTarget{
|
||||
Id: s("1"),
|
||||
Arn: e.TargetArn,
|
||||
Arn: e.SQSQueue.CloudformationLink(),
|
||||
}
|
||||
|
||||
cf := &cloudformationEventBridgeRule{
|
||||
|
|
|
@ -36,8 +36,8 @@ type EventBridgeTarget struct {
|
|||
Name *string
|
||||
Lifecycle fi.Lifecycle
|
||||
|
||||
Rule *EventBridgeRule
|
||||
TargetArn *string
|
||||
Rule *EventBridgeRule
|
||||
SQSQueue *SQS
|
||||
}
|
||||
|
||||
var _ fi.CompareWithID = &EventBridgeTarget{}
|
||||
|
@ -49,7 +49,7 @@ func (eb *EventBridgeTarget) CompareWithID() *string {
|
|||
func (eb *EventBridgeTarget) Find(c *fi.Context) (*EventBridgeTarget, error) {
|
||||
cloud := c.Cloud.(awsup.AWSCloud)
|
||||
|
||||
if eb.Rule == nil || eb.TargetArn == nil {
|
||||
if eb.Rule == nil || eb.SQSQueue == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
@ -74,13 +74,13 @@ func (eb *EventBridgeTarget) Find(c *fi.Context) (*EventBridgeTarget, error) {
|
|||
return nil, nil
|
||||
}
|
||||
for _, target := range response.Targets {
|
||||
if *target.Arn == *eb.TargetArn {
|
||||
if fi.StringValue(target.Arn) == fi.StringValue(eb.SQSQueue.ARN) {
|
||||
actual := &EventBridgeTarget{
|
||||
ID: target.Id,
|
||||
Name: eb.Name,
|
||||
Lifecycle: eb.Lifecycle,
|
||||
Rule: eb.Rule,
|
||||
TargetArn: eb.TargetArn,
|
||||
SQSQueue: eb.SQSQueue,
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
@ -98,8 +98,8 @@ func (_ *EventBridgeTarget) CheckChanges(a, e, changes *EventBridgeTarget) error
|
|||
if e.Rule == nil {
|
||||
return field.Required(field.NewPath("Rule"), "")
|
||||
}
|
||||
if e.TargetArn == nil {
|
||||
return field.Required(field.NewPath("TargetArn"), "")
|
||||
if e.SQSQueue == nil {
|
||||
return field.Required(field.NewPath("SQSQueue"), "")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,7 @@ func (_ *EventBridgeTarget) CheckChanges(a, e, changes *EventBridgeTarget) error
|
|||
func (eb *EventBridgeTarget) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *EventBridgeTarget) error {
|
||||
if a == nil {
|
||||
target := &eventbridge.Target{
|
||||
Arn: eb.TargetArn,
|
||||
Arn: eb.SQSQueue.ARN,
|
||||
Id: aws.String("1"),
|
||||
}
|
||||
|
||||
|
@ -129,13 +129,13 @@ func (eb *EventBridgeTarget) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Eve
|
|||
|
||||
type terraformEventBridgeTarget struct {
|
||||
RuleName *terraformWriter.Literal `json:"rule" cty:"rule"`
|
||||
TargetArn *string `json:"arn" cty:"arn"`
|
||||
TargetArn *terraformWriter.Literal `json:"arn" cty:"arn"`
|
||||
}
|
||||
|
||||
func (_ *EventBridgeTarget) RenderTerraform(t *terraform.TerraformTarget, a, e, changes *EventBridgeTarget) error {
|
||||
tf := &terraformEventBridgeTarget{
|
||||
RuleName: e.Rule.TerraformLink(),
|
||||
TargetArn: e.TargetArn,
|
||||
TargetArn: e.SQSQueue.TerraformLink(),
|
||||
}
|
||||
|
||||
return t.RenderResource("aws_cloudwatch_event_target", *e.Name, tf)
|
||||
|
|
|
@ -39,6 +39,7 @@ type SQS struct {
|
|||
Name *string
|
||||
Lifecycle fi.Lifecycle
|
||||
|
||||
ARN *string
|
||||
URL *string
|
||||
MessageRetentionPeriod int
|
||||
Policy fi.Resource
|
||||
|
@ -49,7 +50,7 @@ type SQS struct {
|
|||
var _ fi.CompareWithID = &SQS{}
|
||||
|
||||
func (q *SQS) CompareWithID() *string {
|
||||
return q.URL
|
||||
return q.ARN
|
||||
}
|
||||
|
||||
func (q *SQS) Find(c *fi.Context) (*SQS, error) {
|
||||
|
@ -75,13 +76,14 @@ func (q *SQS) Find(c *fi.Context) (*SQS, error) {
|
|||
url := response.QueueUrls[0]
|
||||
|
||||
attributes, err := cloud.SQS().GetQueueAttributes(&sqs.GetQueueAttributesInput{
|
||||
AttributeNames: []*string{s("MessageRetentionPeriod"), s("Policy")},
|
||||
AttributeNames: []*string{s("MessageRetentionPeriod"), s("Policy"), s("QueueArn")},
|
||||
QueueUrl: url,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting SQS queue attributes: %v", err)
|
||||
}
|
||||
actualPolicy := *attributes.Attributes["Policy"]
|
||||
actualARN := *attributes.Attributes["QueueArn"]
|
||||
period, err := strconv.Atoi(*attributes.Attributes["MessageRetentionPeriod"])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error coverting MessageRetentionPeriod to int: %v", err)
|
||||
|
@ -118,6 +120,7 @@ func (q *SQS) Find(c *fi.Context) (*SQS, error) {
|
|||
}
|
||||
|
||||
actual := &SQS{
|
||||
ARN: s(actualARN),
|
||||
Name: q.Name,
|
||||
URL: url,
|
||||
Lifecycle: q.Lifecycle,
|
||||
|
@ -127,7 +130,7 @@ func (q *SQS) Find(c *fi.Context) (*SQS, error) {
|
|||
}
|
||||
|
||||
//Avoid flapping
|
||||
q.Name = actual.Name
|
||||
q.ARN = actual.ARN
|
||||
|
||||
return actual, nil
|
||||
}
|
||||
|
@ -170,7 +173,15 @@ func (q *SQS) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *SQS) error {
|
|||
return fmt.Errorf("error creating SQS queue: %v", err)
|
||||
}
|
||||
|
||||
q.URL = response.QueueUrl
|
||||
attributes, err := t.Cloud.SQS().GetQueueAttributes(&sqs.GetQueueAttributesInput{
|
||||
AttributeNames: []*string{s("QueueArn")},
|
||||
QueueUrl: response.QueueUrl,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting SQS queue attributes: %v", err)
|
||||
}
|
||||
|
||||
e.ARN = attributes.Attributes["QueueArn"]
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -199,6 +210,10 @@ func (_ *SQS) RenderTerraform(t *terraform.TerraformTarget, a, e, changes *SQS)
|
|||
return t.RenderResource("aws_sqs_queue", *e.Name, tf)
|
||||
}
|
||||
|
||||
func (e *SQS) TerraformLink() *terraformWriter.Literal {
|
||||
return terraformWriter.LiteralProperty("aws_sqs_queue", *e.Name, "arn")
|
||||
}
|
||||
|
||||
type cloudformationSQSQueue struct {
|
||||
QueueName *string `json:"QueueName"`
|
||||
MessageRetentionPeriod int `json:"MessageRetentionPeriod"`
|
||||
|
@ -243,6 +258,10 @@ func (_ *SQS) RenderCloudformation(t *cloudformation.CloudformationTarget, a, e,
|
|||
return t.RenderResource("AWS::SQS::QueuePolicy", *e.Name+"Policy", cfQueuePolicy)
|
||||
}
|
||||
|
||||
func (e *SQS) CloudformationLink() *cloudformation.Literal {
|
||||
return cloudformation.Ref("AWS::SQS::Queue", *e.Name)
|
||||
}
|
||||
|
||||
// change tags to format required by CreateQueue
|
||||
func convertTagsToPointers(tags map[string]string) map[string]*string {
|
||||
newTags := map[string]*string{}
|
||||
|
|
Loading…
Reference in New Issue