adds aws snssqs dead letter cert test

Signed-off-by: Roberto J Rojas <robertojrojas@gmail.com>
This commit is contained in:
Roberto J Rojas 2023-01-24 21:56:35 -05:00
parent 709c8b9bda
commit e8e0fd7f6c
3 changed files with 223 additions and 19 deletions

View File

@ -312,8 +312,11 @@ jobs:
echo "PUBSUB_AWS_SNSSQS_QUEUE_DLIN=$PUBSUB_AWS_SNSSQS_QUEUE_DLIN" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_QUEUE_DLOUT="sqssnscerttest-dlq-out-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_QUEUE_DLOUT=$PUBSUB_AWS_SNSSQS_QUEUE_DLOUT" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_TOPIC_DL="sqssnscerttest-dlt-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_TOPIC_DL=$PUBSUB_AWS_SNSSQS_TOPIC_DL" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_TOPIC_DLIN="sqssnscerttest-dlt-in-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_TOPIC_DLIN=$PUBSUB_AWS_SNSSQS_TOPIC_DLIN" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_TOPIC_DLOUT="sqssnscerttest-dlt-out-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_TOPIC_DLOUT=$PUBSUB_AWS_SNSSQS_TOPIC_DLOUT" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_QUEUE_FIFO="sqssnscerttest-q-fifo-${{env.UNIQUE_ID}}.fifo"
echo "PUBSUB_AWS_SNSSQS_QUEUE_FIFO=$PUBSUB_AWS_SNSSQS_QUEUE_FIFO" >> $GITHUB_ENV

View File

@ -43,7 +43,7 @@ func deleteQueues(queues []string) error {
func deleteQueue(svc *sqs.SQS, queue string) error {
fmt.Printf("deleteQueue: %q\n", queue)
queueUrl, err := getQueueURL(svc, &queue)
queueUrl, err := getQueueURL(svc, queue)
if err != nil {
return fmt.Errorf("error getting the queue URL: %q err:%v", queue, err)
}
@ -55,9 +55,9 @@ func deleteQueue(svc *sqs.SQS, queue string) error {
return err
}
func getQueueURL(svc *sqs.SQS, queue *string) (string, error) {
func getQueueURL(svc *sqs.SQS, queue string) (string, error) {
urlResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: queue,
QueueName: aws.String(queue),
})
if err != nil {
@ -68,20 +68,18 @@ func getQueueURL(svc *sqs.SQS, queue *string) (string, error) {
}
func getMessages(svc *sqs.SQS, queueURL string) (*sqs.ReceiveMessageOutput, error) {
msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
input := sqs.ReceiveMessageInput{
// use this property to decide when a message should be discarded.
AttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
aws.String(sqs.MessageSystemAttributeNameApproximateReceiveCount),
},
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: aws.String(queueURL),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(20),
})
}
msgResult, err := svc.ReceiveMessage(&input)
if err != nil {
return nil, err
}
@ -89,6 +87,18 @@ func getMessages(svc *sqs.SQS, queueURL string) (*sqs.ReceiveMessageOutput, erro
return msgResult, nil
}
func deleteMessage(svc *sqs.SQS, queueURL, messageHandle string) error {
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(messageHandle),
})
if err != nil {
return err
}
return nil
}
func deleteTopics(topics []string, region string) error {
sess := session.Must(
session.NewSessionWithOptions(
@ -125,6 +135,7 @@ func deleteTopics(topics []string, region string) error {
}
func deleteTopic(svc snsiface.SNSAPI, topic string) error {
fmt.Printf("deleteTopic: %q\n", topic)
_, err := svc.DeleteTopic(&sns.DeleteTopicInput{
TopicArn: aws.String(topic),
})

View File

@ -17,6 +17,7 @@ import (
"context"
"fmt"
"encoding/json"
"os"
"testing"
"time"
@ -70,6 +71,9 @@ var (
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
deadLetterTopicOut = "deadLetterTopicOut" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLOUT
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
)
// The following Queue names must match
@ -79,11 +83,7 @@ var (
//
// PUBSUB_AWS_SNSSQS_QUEUE_*
// PUBSUB_AWS_SNSSQS_TOPIC_*
var queues = []string{
"snscerttest1",
"snssqscerttest2",
"snssqscerttestfifo",
}
var queues = []string{}
var topics = []string{
topicActiveName,
@ -100,15 +100,24 @@ func init() {
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_1")
if qn != "" {
queues[0] = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_2")
if qn != "" {
queues[1] = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_FIFO")
if qn != "" {
queues[2] = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLIN")
if qn != "" {
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLOUT")
if qn != "" {
deadLetterQueueName = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
@ -125,6 +134,16 @@ func init() {
fifoTopic = qn
topics = append(topics, fifoTopic)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_DLIN")
if qn != "" {
deadLetterTopicIn = qn
topics = append(topics, deadLetterTopicIn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_DLOUT")
if qn != "" {
deadLetterTopicOut = qn
topics = append(topics, deadLetterTopicOut)
}
}
@ -170,6 +189,10 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
t.Run("SNSSQSFIFOMessages", func(t *testing.T) {
SNSSQSFIFOMessages(t)
})
t.Run("SNSSQSMessageDeadLetter", func(t *testing.T) {
SNSSQSMessageDeadLetter(t)
})
}
// Verify with single publisher / single subscriber
@ -1355,6 +1378,173 @@ func SNSSQSFIFOMessages(t *testing.T) {
}
// Verify data with an optional parameters `sqsDeadLettersQueueName`, `messageRetryLimit`, and `messageReceiveLimit` takes affect
func SNSSQSMessageDeadLetter(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
deadLetterConsumerGroup := watcher.NewUnordered()
failedMessagesNum := 1
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure on purpose...", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return true, fmt.Errorf("failure on purpose")
}),
)
}
}
var task flow.AsyncTask
deadLetterReceiverApplication := func(topicOutName string, messagesWatcher *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
svc := sqsService()
counter := 1
for {
select {
case <-task.Done():
ctx.Log("deadLetterReceiverApplication - task done!")
return nil
case <-t.C:
queueURL, err := getQueueURL(svc, topicOutName)
if err != nil {
ctx.Logf("deadLetterReceiverApplication - failed get queue URL %q %v\n", topicOutName, err)
continue
}
ctx.Logf("deadLetterReceiverApplication - gettting messages (%q) counter(%d)\n", queueURL, counter)
msgResult, err := getMessages(svc, queueURL)
if err != nil {
ctx.Logf("deadLetterReceiverApplication - failed getting messages %v\n", err)
continue
}
for _, msg := range msgResult.Messages {
err = deleteMessage(svc, queueURL, *msg.ReceiptHandle)
if err != nil {
ctx.Logf("deadLetterReceiverApplication - failed deleting message %q %v\n", *msg.ReceiptHandle, err)
return err
}
var snsMessagePayload struct {
Message string
TopicArn string
}
err := json.Unmarshal([]byte(*(msg.Body)), &snsMessagePayload)
if err != nil {
return fmt.Errorf("deadLetterReceiverApplication - error unmarshalling message Body: %v", err)
}
var messageWrapper struct {
Data string `json:"data"`
}
err = json.Unmarshal([]byte(snsMessagePayload.Message), &messageWrapper)
if err != nil {
return fmt.Errorf("deadLetterReceiverApplication - error unmarshalling message data: %v", err)
}
messagesWatcher.Observe(messageWrapper.Data)
if counter >= failedMessagesNum {
ctx.Logf("deadLetterReceiverApplication - received all expected (%d) failed message!\n", failedMessagesNum)
return nil
}
counter += 1
}
}
}
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, topicOutName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, failedMessagesNum)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicOutName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("SNSSQSMessageDeadLetter - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMessageDeadLetter - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 3*timeout) {
ctx.Errorf("SNSSQSMessageDeadLetter - message assersion failed for watcher: %#v\n", m)
}
}
return nil
}
}
prefix := "SNSSQSMessageDeadLetter-"
deadletterApp := prefix + "deadLetterReceiverApp"
subApp := prefix + "subscriberApp"
subAppSideCar := prefix + sidecarName2
flow.New(t, "SNSSQSMessageDeadLetter Verify with single publisher / single subscriber and DeadLetter").
// Run deadLetterReceiverApplication - should receive messages from dead letter queue
// "PUBSUB_AWS_SNSSQS_QUEUE_DLOUT"
StepAsync(deadletterApp, &task,
deadLetterReceiverApplication(deadLetterQueueName, deadLetterConsumerGroup)).
// Run subscriberApplication - will fail to process messages
Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4),
subscriberApplication(subApp, deadLetterTopicIn, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_DLIN"
Step(sidecar.Run(subAppSideCar,
embedded.WithComponentsPath("./components/deadletter"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+4),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+4),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+4),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+4),
componentRuntimeOptions(),
)).
Step("publish messages to deadLetterTopicIn ==> "+deadLetterTopicIn, publishMessages(nil, subAppSideCar, deadLetterTopicIn, deadLetterTopicOut, deadLetterConsumerGroup)).
Step("wait", flow.Sleep(60*time.Second)).
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app2 has deadletterMessageNum recevied messages send to dead letter queue", assertMessages(10*time.Second, deadLetterConsumerGroup)).
Step("reset", flow.Reset(consumerGroup1, deadLetterConsumerGroup)).
Run()
}
func componentRuntimeOptions() []runtime.Option {
log := logger.NewLogger("dapr.components")