adds aws snssqs disableDeleteOnRetryLimit cert test

Signed-off-by: Roberto J Rojas <robertojrojas@gmail.com>
This commit is contained in:
Roberto J Rojas 2023-01-26 18:32:47 -05:00
parent b6dcbe8fd4
commit 33cb71cf51
4 changed files with 199 additions and 6 deletions

View File

@ -67,6 +67,12 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs
- Subscriber 1 subscribes to 1 topic and fails causing messages to go to deadletter queue
- Subscriber 2 polls messages from the deadletter queue
- Message are expected to only be successfully consumed by Subscriber 2 from deadletter queue
- Verify data with an optional parameters `disableDeleteOnRetryLimit` and `messageRetryLimit` take affect
- Run dapr application with 1 publisher, 1 subscriber, and 1 topic
- Publishers publishes to 1 topic
- Subcriber 1 fails to consume the message more times than `messageRetryLimit`
- Message sits in the Queue and it is not removed due to `disableDeleteOnRetryLimit`
- After some time, consumer is ready to consume message succesfully
### Running the tests
This must be run in the GitHub Actions Workflow configured for test infrastructure setup.

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,31 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: snssqs-cert-tests
spec:
type: pubsub.snssqs
version: v1
metadata:
- name: accessKey
secretKeyRef:
name: AWS_ACCESS_KEY_ID
key: AWS_ACCESS_KEY_ID
- name: secretKey
secretKeyRef:
name: AWS_SECRET_ACCESS_KEY
key: AWS_SECRET_ACCESS_KEY
- name: region
secretKeyRef:
name: AWS_REGION
key: AWS_REGION
- name: consumerID
secretKeyRef:
name: PUBSUB_AWS_SNSSQS_QUEUE_NODRT
key: PUBSUB_AWS_SNSSQS_QUEUE_NODRT
- name: messageRetryLimit
value: 2
- name: disableDeleteOnRetryLimit
value: "true"
auth:
secretstore: envvar-secret-store

View File

@ -16,6 +16,7 @@ package snssqs_test
import (
"context"
"fmt"
"sync/atomic"
"os"
"testing"
@ -66,12 +67,14 @@ const (
)
var (
region = "us-east-1" // replaced with env var AWS_REGION
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
region = "us-east-1" // replaced with env var AWS_REGION
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
disableDeleteOnRetryLimitTopicIn = "disableDeleteOnRetryLimitTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_NODRT
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
disableDeleteOnRetryLimitQueueName = "disableDeleteOnRetryLimitQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_NODRT
)
// The following Queue names must match
@ -117,6 +120,11 @@ func init() {
deadLetterQueueName = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_NODRT")
if qn != "" {
disableDeleteOnRetryLimitQueueName = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
if qn != "" {
@ -137,6 +145,11 @@ func init() {
deadLetterTopicIn = qn
topics = append(topics, deadLetterTopicIn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_NODRT")
if qn != "" {
disableDeleteOnRetryLimitTopicIn = qn
topics = append(topics, disableDeleteOnRetryLimitTopicIn)
}
}
func TestAWSSNSSQSCertificationTests(t *testing.T) {
@ -185,6 +198,10 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
t.Run("SNSSQSMessageDeadLetter", func(t *testing.T) {
SNSSQSMessageDeadLetter(t)
})
t.Run("SNSSQSMessageDisableDeleteOnRetryLimit", func(t *testing.T) {
SNSSQSMessageDisableDeleteOnRetryLimit(t)
})
}
// Verify with single publisher / single subscriber
@ -1521,6 +1538,136 @@ func SNSSQSMessageDeadLetter(t *testing.T) {
Run()
}
// Verify data with an optional parameters `disableDeleteOnRetryLimit` takes affect
func SNSSQSMessageDisableDeleteOnRetryLimit(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
failedMessagesNum := 1
readyToConsume := atomic.Bool{}
var task flow.AsyncTask
setReadyToConsume := func(consumeTimeout time.Duration, _ *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
tm := time.After(consumeTimeout * time.Second)
for {
select {
case <-task.Done():
ctx.Log("setReadyToConsume - task done called!")
return nil
case <-tm:
ctx.Logf("setReadyToConsume setting readyToConsume")
readyToConsume.Store(true)
}
}
return nil
}
}
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if !readyToConsume.Load() {
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure on purpose...", appID, e.PubsubName, e.Topic, e.ID, e.Data)
time.Sleep(5 * time.Second)
return true, fmt.Errorf("failure on purpose")
} else {
messagesWatcher.Observe(e.Data)
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, failedMessagesNum)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("SNSSQSMessageDisableDeleteOnRetryLimit - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMessageDisableDeleteOnRetryLimit - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 3*timeout) {
ctx.Errorf("SNSSQSMessageDisableDeleteOnRetryLimit - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
prefix := "SNSSQSMessageDisableDeleteOnRetryLimit-"
setReadyToConsumeApp := prefix + "setReadyToConsumeApp"
subApp := prefix + "subscriberApp"
subAppSideCar := prefix + sidecarName2
readyToConsumeTimeout := time.Duration(20) //seconds
flow.New(t, "SNSSQSMessageDisableDeleteOnRetryLimit Verify data with an optional parameters `disableDeleteOnRetryLimit` takes affect").
StepAsync(setReadyToConsumeApp, &task,
setReadyToConsume(readyToConsumeTimeout, nil)).
// Run subscriberApplication - will fail to process messages
Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4),
subscriberApplication(subApp, disableDeleteOnRetryLimitTopicIn, consumerGroup1))).
// Run the Dapr sidecar with "PUBSUB_AWS_SNSSQS_TOPIC_NODRT"
Step(sidecar.Run(subAppSideCar,
embedded.WithComponentsPath("./components/disableDeleteOnRetryLimit"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+4),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+4),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+4),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+4),
componentRuntimeOptions(),
)).
Step("publish messages to disableDeleteOnRetryLimitTopicIn ==> "+disableDeleteOnRetryLimitTopicIn, publishMessages(nil, subAppSideCar, disableDeleteOnRetryLimitTopicIn, consumerGroup1)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
Run()
}
func componentRuntimeOptions() []runtime.Option {
log := logger.NewLogger("dapr.components")