starts SQS deadletter cert test

Signed-off-by: Roberto J Rojas <robertojrojas@gmail.com>
This commit is contained in:
Roberto J Rojas 2023-01-23 15:19:03 -05:00
parent bccb6a6994
commit b90fad40ba
6 changed files with 279 additions and 4 deletions

View File

@ -308,6 +308,11 @@ jobs:
PUBSUB_AWS_SNSSQS_TOPIC_MVT="sqssnscerttest-tp-mvt-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_TOPIC_MVT=$PUBSUB_AWS_SNSSQS_TOPIC_MVT" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_QUEUE_DLIN="sqssnscerttest-dlq-in-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_QUEUE_DLIN=$PUBSUB_AWS_SNSSQS_QUEUE_DLIN" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_TOPIC_DL="sqssnscerttest-dlt-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_TOPIC_DL=$PUBSUB_AWS_SNSSQS_TOPIC_DL" >> $GITHUB_ENV
AWS_REGION="us-east-1"
echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV

View File

@ -56,6 +56,7 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs
- Subscriber 1 reeives message, notifies subscriber 2 and sumlates being busy for time shorter than messageVisibilityTimeout seconds
- Subscriber 2 receives go ahead and subscribes to 1 topic
- Subscriber 2 must not receive message
- Verify data with an optional parameters `sqsDeadLettersQueueName`, `messageRetryLimit`, and `messageReceiveLimit` takes affect (SNSSQSMessageDeadLetter)
### Running the tests

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,36 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: snssqs-cert-tests
spec:
type: pubsub.snssqs
version: v1
metadata:
- name: accessKey
secretKeyRef:
name: AWS_ACCESS_KEY_ID
key: AWS_ACCESS_KEY_ID
- name: secretKey
secretKeyRef:
name: AWS_SECRET_ACCESS_KEY
key: AWS_SECRET_ACCESS_KEY
- name: region
secretKeyRef:
name: AWS_REGION
key: AWS_REGION
- name: consumerID
secretKeyRef:
name: PUBSUB_AWS_SNSSQS_QUEUE_DLIN
key: PUBSUB_AWS_SNSSQS_QUEUE_DLIN
- name: sqsDeadLettersQueueName
secretKeyRef:
name: PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
key: PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
- name: messageReceiveLimit
value: 2
- name: messageRetryLimit
value: 2
auth:
secretstore: envvar-secret-store

View File

@ -42,20 +42,53 @@ func deleteQueues(queues []string) error {
}
func deleteQueue(svc *sqs.SQS, queue string) error {
queueUrlOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: &queue,
})
fmt.Printf("deleteQueue: %q\n", queue)
queueUrl, err := getQueueURL(svc, &queue)
if err != nil {
return fmt.Errorf("error getting the queue URL: %q err:%v", queue, err)
}
_, err = svc.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: queueUrlOutput.QueueUrl,
QueueUrl: &queueUrl,
})
return err
}
func getQueueURL(svc *sqs.SQS, queue *string) (string, error) {
urlResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: queue,
})
if err != nil {
return "", err
}
return *urlResult.QueueUrl, nil
}
func getMessages(svc *sqs.SQS, queueURL string) (*sqs.ReceiveMessageOutput, error) {
msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
// use this property to decide when a message should be discarded.
AttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: aws.String(queueURL),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
return nil, err
}
return msgResult, nil
}
func deleteTopics(topics []string, region string) error {
sess := session.Must(
session.NewSessionWithOptions(

View File

@ -68,6 +68,8 @@ var (
region = "us-east-1" // replaced with env var AWS_REGION
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
deadLetterTopic = "deadLetterTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DL
dealLetterQueueName = "dealLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
)
// The following Queue names must match
@ -80,6 +82,8 @@ var (
var queues = []string{
"snscerttest1",
"snssqscerttest2",
"snssqscerttestdeadletterin",
"snssqscerttestdeadletterout",
}
var topics = []string{
@ -103,6 +107,16 @@ func init() {
if qn != "" {
queues[1] = qn
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLIN")
if qn != "" {
queues[2] = qn
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLOUT")
if qn != "" {
dealLetterQueueName = qn
queues[3] = qn
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
if qn != "" {
existingTopic = qn
@ -111,6 +125,10 @@ func init() {
if qn != "" {
messageVisibilityTimeoutTopic = qn
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_DL")
if qn != "" {
deadLetterTopic = qn
}
}
func TestAWSSNSSQSCertificationTests(t *testing.T) {
@ -151,6 +169,10 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
t.Run("SNSSQSMessageVisibilityTimeout", func(t *testing.T) {
SNSSQSMessageVisibilityTimeout(t)
})
t.Run("SNSSQSMessageDeadLetter", func(t *testing.T) {
SNSSQSMessageDeadLetter(t)
})
}
// Verify with single publisher / single subscriber
@ -1197,6 +1219,175 @@ func SNSSQSMessageVisibilityTimeout(t *testing.T) {
}
// Verify data with an optional parameters `sqsDeadLettersQueueName` and `messageReceiveLimit` takes affect
func SNSSQSMessageDeadLetter(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
deadLetterConsumerGroup := watcher.NewUnordered()
failedMessagesNum := 1
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure...", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return true, fmt.Errorf("failure on purpose")
}),
)
}
}
// subscriber of the given deadletter queue
deadLetterReceiverApplication := func(appID string, deadLetterQueueName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
func() error {
ctx.Logf("%s is waiting for messages...\n", appID)
// Create a SQS session
sqsSess := sqsService()
// Get URL of queue
queueUrl, err := getQueueURL(sqsSess, &deadLetterQueueName)
if err != nil {
return fmt.Errorf("%s error getting the queue URL: %q err:%v", appID, deadLetterQueueName, err)
}
// Wait for DeadLetter Queue message to arrive
for {
msgResult, err := getMessages(sqsSess, queueUrl)
if err != nil {
return fmt.Errorf("%s error getting messages from dead letter queue URL: %q err:%v", appID, queueUrl, err)
}
if len(msgResult.Messages) == 0 {
ctx.Logf("%s no messages sleeping and trying again...\n", appID)
time.Sleep(5 * time.Second)
continue
}
for _, msg := range msgResult.Messages {
ctx.Logf("deadLetterReceiverApplication - Message Received appID: %s,deadLetterQueueName: %s, data: %s", appID, deadLetterQueueName, *msg.Body)
// Track/Observe deadletter message.
messagesWatcher.Observe(*msg.Body)
break
}
}
ctx.Logf("%s done!\n", appID)
return nil
}(),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, failedMessagesNum)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("SNSSQSMessageDeadLetter - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMessageDeadLetter - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 2*timeout) {
ctx.Errorf("SNSSQSMessageDeadLetter - message assersion failed for watcher: %#v\n", m)
}
}
return nil
}
}
connectToSideCar := func(sidecarName string) flow.Runnable {
return func(ctx flow.Context) error {
ctx.Logf("####### connect to sidecar (dapr) client sidecarName: %s and exit", sidecarName)
// get the sidecar (dapr) client
sidecar.GetClient(ctx, sidecarName)
return nil
}
}
prefix := "SNSSQSMessageDeadLetter-"
subApp := prefix + "subscriberApp"
deadletterApp := prefix + "deadLetterReceiverApp"
deadLetterSideCar := prefix + sidecarName1
subAppSideCar := prefix + sidecarName2
flow.New(t, "SNSSQSMessageDeadLetter Verify with single publisher / single subscriber and DeadLetter").
// Run deadLetterReceiverApplication - should receive messages from dead letter queue
Step(app.Run(deadletterApp, fmt.Sprintf(":%d", appPort+portOffset+2),
deadLetterReceiverApplication(deadletterApp, dealLetterQueueName, deadLetterConsumerGroup))).
Step(sidecar.Run(deadletterApp,
embedded.WithComponentsPath("./components/message_visibility_timeout"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+2),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+2),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+2),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+2),
componentRuntimeOptions(),
)).
Step("No messages will be sent here",
connectToSideCar(deadLetterSideCar)).
Step("wait", flow.Sleep(10*time.Second)).
// Run subscriberApplication - will fail to process messages
Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4),
subscriberApplication(subApp, deadLetterTopic, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_DLIN"
Step(sidecar.Run(subAppSideCar,
embedded.WithComponentsPath("./components/deadletter"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+4),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+4),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+4),
componentRuntimeOptions(),
)).
Step("publish messages to deadLetterTopic ==> "+deadLetterTopic, publishMessages(nil, subAppSideCar, deadLetterTopic, deadLetterConsumerGroup)).
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app2 has deadletterMessageNum recevied messages send to dead letter queue", assertMessages(10*time.Second, deadLetterConsumerGroup)).
Step("reset", flow.Reset(consumerGroup1, deadLetterConsumerGroup)).
Step("wait", flow.Sleep(10*time.Second)).
Run()
}
func componentRuntimeOptions() []runtime.Option {
log := logger.NewLogger("dapr.components")