diff --git a/.github/workflows/certification.yml b/.github/workflows/certification.yml index 5afaae6cf..8f5475be7 100644 --- a/.github/workflows/certification.yml +++ b/.github/workflows/certification.yml @@ -308,6 +308,11 @@ jobs: PUBSUB_AWS_SNSSQS_TOPIC_MVT="sqssnscerttest-tp-mvt-${{env.UNIQUE_ID}}" echo "PUBSUB_AWS_SNSSQS_TOPIC_MVT=$PUBSUB_AWS_SNSSQS_TOPIC_MVT" >> $GITHUB_ENV + PUBSUB_AWS_SNSSQS_QUEUE_DLIN="sqssnscerttest-dlq-in-${{env.UNIQUE_ID}}" + echo "PUBSUB_AWS_SNSSQS_QUEUE_DLIN=$PUBSUB_AWS_SNSSQS_QUEUE_DLIN" >> $GITHUB_ENV + PUBSUB_AWS_SNSSQS_TOPIC_DL="sqssnscerttest-dlt-${{env.UNIQUE_ID}}" + echo "PUBSUB_AWS_SNSSQS_TOPIC_DL=$PUBSUB_AWS_SNSSQS_TOPIC_DL" >> $GITHUB_ENV + AWS_REGION="us-east-1" echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV diff --git a/tests/certification/pubsub/aws/snssqs/README.md b/tests/certification/pubsub/aws/snssqs/README.md index fb49da0e0..6abcdbc9f 100644 --- a/tests/certification/pubsub/aws/snssqs/README.md +++ b/tests/certification/pubsub/aws/snssqs/README.md @@ -56,6 +56,7 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs - Subscriber 1 reeives message, notifies subscriber 2 and sumlates being busy for time shorter than messageVisibilityTimeout seconds - Subscriber 2 receives go ahead and subscribes to 1 topic - Subscriber 2 must not receive message +- Verify data with an optional parameters `sqsDeadLettersQueueName`, `messageRetryLimit`, and `messageReceiveLimit` takes affect (SNSSQSMessageDeadLetter) ### Running the tests diff --git a/tests/certification/pubsub/aws/snssqs/components/deadletter/localsecrets.yaml b/tests/certification/pubsub/aws/snssqs/components/deadletter/localsecrets.yaml new file mode 100644 index 000000000..82ad3b39b --- /dev/null +++ b/tests/certification/pubsub/aws/snssqs/components/deadletter/localsecrets.yaml @@ -0,0 +1,9 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: envvar-secret-store + namespace: default +spec: + type: secretstores.local.env + version: v1 + metadata: \ No newline at end of file diff --git a/tests/certification/pubsub/aws/snssqs/components/deadletter/pubsub.yaml b/tests/certification/pubsub/aws/snssqs/components/deadletter/pubsub.yaml new file mode 100644 index 000000000..693d9dd9d --- /dev/null +++ b/tests/certification/pubsub/aws/snssqs/components/deadletter/pubsub.yaml @@ -0,0 +1,36 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: snssqs-cert-tests +spec: + type: pubsub.snssqs + version: v1 + metadata: + - name: accessKey + secretKeyRef: + name: AWS_ACCESS_KEY_ID + key: AWS_ACCESS_KEY_ID + - name: secretKey + secretKeyRef: + name: AWS_SECRET_ACCESS_KEY + key: AWS_SECRET_ACCESS_KEY + - name: region + secretKeyRef: + name: AWS_REGION + key: AWS_REGION + - name: consumerID + secretKeyRef: + name: PUBSUB_AWS_SNSSQS_QUEUE_DLIN + key: PUBSUB_AWS_SNSSQS_QUEUE_DLIN + - name: sqsDeadLettersQueueName + secretKeyRef: + name: PUBSUB_AWS_SNSSQS_QUEUE_DLOUT + key: PUBSUB_AWS_SNSSQS_QUEUE_DLOUT + - name: messageReceiveLimit + value: 2 + - name: messageRetryLimit + value: 2 + + +auth: + secretstore: envvar-secret-store \ No newline at end of file diff --git a/tests/certification/pubsub/aws/snssqs/snssqs_helper.go b/tests/certification/pubsub/aws/snssqs/snssqs_helper.go index b8e4404b6..c03437172 100644 --- a/tests/certification/pubsub/aws/snssqs/snssqs_helper.go +++ b/tests/certification/pubsub/aws/snssqs/snssqs_helper.go @@ -42,20 +42,53 @@ func deleteQueues(queues []string) error { } func deleteQueue(svc *sqs.SQS, queue string) error { - queueUrlOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: &queue, - }) + fmt.Printf("deleteQueue: %q\n", queue) + queueUrl, err := getQueueURL(svc, &queue) if err != nil { return fmt.Errorf("error getting the queue URL: %q err:%v", queue, err) } _, err = svc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: queueUrlOutput.QueueUrl, + QueueUrl: &queueUrl, }) return err } +func getQueueURL(svc *sqs.SQS, queue *string) (string, error) { + urlResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: queue, + }) + + if err != nil { + return "", err + } + + return *urlResult.QueueUrl, nil +} + +func getMessages(svc *sqs.SQS, queueURL string) (*sqs.ReceiveMessageOutput, error) { + msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + // use this property to decide when a message should be discarded. + AttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + MaxNumberOfMessages: aws.Int64(10), + QueueUrl: aws.String(queueURL), + VisibilityTimeout: aws.Int64(5), + WaitTimeSeconds: aws.Int64(20), + }) + + if err != nil { + return nil, err + } + + return msgResult, nil +} + func deleteTopics(topics []string, region string) error { sess := session.Must( session.NewSessionWithOptions( diff --git a/tests/certification/pubsub/aws/snssqs/snssqs_test.go b/tests/certification/pubsub/aws/snssqs/snssqs_test.go index 0478c1caa..717de2f77 100644 --- a/tests/certification/pubsub/aws/snssqs/snssqs_test.go +++ b/tests/certification/pubsub/aws/snssqs/snssqs_test.go @@ -68,6 +68,8 @@ var ( region = "us-east-1" // replaced with env var AWS_REGION existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3 messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT + deadLetterTopic = "deadLetterTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DL + dealLetterQueueName = "dealLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT ) // The following Queue names must match @@ -80,6 +82,8 @@ var ( var queues = []string{ "snscerttest1", "snssqscerttest2", + "snssqscerttestdeadletterin", + "snssqscerttestdeadletterout", } var topics = []string{ @@ -103,6 +107,16 @@ func init() { if qn != "" { queues[1] = qn } + qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLIN") + if qn != "" { + queues[2] = qn + } + qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLOUT") + if qn != "" { + dealLetterQueueName = qn + queues[3] = qn + } + qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3") if qn != "" { existingTopic = qn @@ -111,6 +125,10 @@ func init() { if qn != "" { messageVisibilityTimeoutTopic = qn } + qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_DL") + if qn != "" { + deadLetterTopic = qn + } } func TestAWSSNSSQSCertificationTests(t *testing.T) { @@ -151,6 +169,10 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) { t.Run("SNSSQSMessageVisibilityTimeout", func(t *testing.T) { SNSSQSMessageVisibilityTimeout(t) }) + + t.Run("SNSSQSMessageDeadLetter", func(t *testing.T) { + SNSSQSMessageDeadLetter(t) + }) } // Verify with single publisher / single subscriber @@ -1197,6 +1219,175 @@ func SNSSQSMessageVisibilityTimeout(t *testing.T) { } +// Verify data with an optional parameters `sqsDeadLettersQueueName` and `messageReceiveLimit` takes affect +func SNSSQSMessageDeadLetter(t *testing.T) { + consumerGroup1 := watcher.NewUnordered() + deadLetterConsumerGroup := watcher.NewUnordered() + failedMessagesNum := 1 + + // subscriber of the given topic + subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { + return func(ctx flow.Context, s common.Service) error { + return multierr.Combine( + s.AddTopicEventHandler(&common.Subscription{ + PubsubName: pubsubName, + Topic: topicName, + Route: "/orders", + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure...", appID, e.PubsubName, e.Topic, e.ID, e.Data) + return true, fmt.Errorf("failure on purpose") + }), + ) + } + } + + // subscriber of the given deadletter queue + deadLetterReceiverApplication := func(appID string, deadLetterQueueName string, messagesWatcher *watcher.Watcher) app.SetupFn { + return func(ctx flow.Context, s common.Service) error { + return multierr.Combine( + func() error { + ctx.Logf("%s is waiting for messages...\n", appID) + // Create a SQS session + sqsSess := sqsService() + + // Get URL of queue + queueUrl, err := getQueueURL(sqsSess, &deadLetterQueueName) + if err != nil { + return fmt.Errorf("%s error getting the queue URL: %q err:%v", appID, deadLetterQueueName, err) + } + + // Wait for DeadLetter Queue message to arrive + for { + msgResult, err := getMessages(sqsSess, queueUrl) + if err != nil { + return fmt.Errorf("%s error getting messages from dead letter queue URL: %q err:%v", appID, queueUrl, err) + } + + if len(msgResult.Messages) == 0 { + ctx.Logf("%s no messages sleeping and trying again...\n", appID) + time.Sleep(5 * time.Second) + continue + } + + for _, msg := range msgResult.Messages { + ctx.Logf("deadLetterReceiverApplication - Message Received appID: %s,deadLetterQueueName: %s, data: %s", appID, deadLetterQueueName, *msg.Body) + // Track/Observe deadletter message. + messagesWatcher.Observe(*msg.Body) + break + } + } + + ctx.Logf("%s done!\n", appID) + return nil + }(), + ) + } + } + + publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + // prepare the messages + messages := make([]string, failedMessagesNum) + for i := range messages { + messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String()) + } + + // add the messages as expectations to the watchers + for _, messageWatcher := range messageWatchers { + messageWatcher.ExpectStrings(messages...) + } + + // get the sidecar (dapr) client + client := sidecar.GetClient(ctx, sidecarName) + + // publish messages + ctx.Logf("SNSSQSMessageDeadLetter - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName) + + var publishOptions dapr.PublishEventOption + + if metadata != nil { + publishOptions = dapr.PublishEventWithMetadata(metadata) + } + + for _, message := range messages { + ctx.Logf("Publishing: %q", message) + var err error + + if publishOptions != nil { + err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions) + } else { + err = client.PublishEvent(ctx, pubsubName, topicName, message) + } + require.NoError(ctx, err, "SNSSQSMessageDeadLetter - error publishing message") + } + return nil + } + } + + assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + // assert for messages + for _, m := range messageWatchers { + if !m.Assert(ctx, 2*timeout) { + ctx.Errorf("SNSSQSMessageDeadLetter - message assersion failed for watcher: %#v\n", m) + } + } + + return nil + } + } + + connectToSideCar := func(sidecarName string) flow.Runnable { + return func(ctx flow.Context) error { + ctx.Logf("####### connect to sidecar (dapr) client sidecarName: %s and exit", sidecarName) + // get the sidecar (dapr) client + sidecar.GetClient(ctx, sidecarName) + return nil + } + } + + prefix := "SNSSQSMessageDeadLetter-" + subApp := prefix + "subscriberApp" + deadletterApp := prefix + "deadLetterReceiverApp" + deadLetterSideCar := prefix + sidecarName1 + subAppSideCar := prefix + sidecarName2 + + flow.New(t, "SNSSQSMessageDeadLetter Verify with single publisher / single subscriber and DeadLetter"). + + // Run deadLetterReceiverApplication - should receive messages from dead letter queue + Step(app.Run(deadletterApp, fmt.Sprintf(":%d", appPort+portOffset+2), + deadLetterReceiverApplication(deadletterApp, dealLetterQueueName, deadLetterConsumerGroup))). + Step(sidecar.Run(deadletterApp, + embedded.WithComponentsPath("./components/message_visibility_timeout"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+2), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+2), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+2), + embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+2), + componentRuntimeOptions(), + )). + Step("No messages will be sent here", + connectToSideCar(deadLetterSideCar)). + Step("wait", flow.Sleep(10*time.Second)). + + // Run subscriberApplication - will fail to process messages + Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4), + subscriberApplication(subApp, deadLetterTopic, consumerGroup1))). + // Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_DLIN" + Step(sidecar.Run(subAppSideCar, + embedded.WithComponentsPath("./components/deadletter"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+4), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+4), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+4), + componentRuntimeOptions(), + )). + Step("publish messages to deadLetterTopic ==> "+deadLetterTopic, publishMessages(nil, subAppSideCar, deadLetterTopic, deadLetterConsumerGroup)). + Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)). + Step("verify if app2 has deadletterMessageNum recevied messages send to dead letter queue", assertMessages(10*time.Second, deadLetterConsumerGroup)). + Step("reset", flow.Reset(consumerGroup1, deadLetterConsumerGroup)). + Step("wait", flow.Sleep(10*time.Second)). + Run() +} + func componentRuntimeOptions() []runtime.Option { log := logger.NewLogger("dapr.components")