|
|
|
|
@ -16,6 +16,7 @@ package snssqs_test
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
|
|
|
|
|
"os"
|
|
|
|
|
"testing"
|
|
|
|
|
@ -66,12 +67,14 @@ const (
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
region = "us-east-1" // replaced with env var AWS_REGION
|
|
|
|
|
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
|
|
|
|
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
|
|
|
|
|
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
|
|
|
|
|
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
|
|
|
|
|
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
|
|
|
|
|
region = "us-east-1" // replaced with env var AWS_REGION
|
|
|
|
|
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
|
|
|
|
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
|
|
|
|
|
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
|
|
|
|
|
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
|
|
|
|
|
disableDeleteOnRetryLimitTopicIn = "disableDeleteOnRetryLimitTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_NODRT
|
|
|
|
|
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
|
|
|
|
|
disableDeleteOnRetryLimitQueueName = "disableDeleteOnRetryLimitQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_NODRT
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// The following Queue names must match
|
|
|
|
|
@ -117,6 +120,11 @@ func init() {
|
|
|
|
|
deadLetterQueueName = qn
|
|
|
|
|
queues = append(queues, qn)
|
|
|
|
|
}
|
|
|
|
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_NODRT")
|
|
|
|
|
if qn != "" {
|
|
|
|
|
disableDeleteOnRetryLimitQueueName = qn
|
|
|
|
|
queues = append(queues, qn)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
|
|
|
|
|
if qn != "" {
|
|
|
|
|
@ -137,6 +145,11 @@ func init() {
|
|
|
|
|
deadLetterTopicIn = qn
|
|
|
|
|
topics = append(topics, deadLetterTopicIn)
|
|
|
|
|
}
|
|
|
|
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_NODRT")
|
|
|
|
|
if qn != "" {
|
|
|
|
|
disableDeleteOnRetryLimitTopicIn = qn
|
|
|
|
|
topics = append(topics, disableDeleteOnRetryLimitTopicIn)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
|
|
|
|
@ -185,6 +198,10 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
|
|
|
|
t.Run("SNSSQSMessageDeadLetter", func(t *testing.T) {
|
|
|
|
|
SNSSQSMessageDeadLetter(t)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("SNSSQSMessageDisableDeleteOnRetryLimit", func(t *testing.T) {
|
|
|
|
|
SNSSQSMessageDisableDeleteOnRetryLimit(t)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify with single publisher / single subscriber
|
|
|
|
|
@ -1521,6 +1538,136 @@ func SNSSQSMessageDeadLetter(t *testing.T) {
|
|
|
|
|
Run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify data with an optional parameters `disableDeleteOnRetryLimit` takes affect
|
|
|
|
|
func SNSSQSMessageDisableDeleteOnRetryLimit(t *testing.T) {
|
|
|
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
|
failedMessagesNum := 1
|
|
|
|
|
|
|
|
|
|
readyToConsume := atomic.Bool{}
|
|
|
|
|
var task flow.AsyncTask
|
|
|
|
|
setReadyToConsume := func(consumeTimeout time.Duration, _ *watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
tm := time.After(consumeTimeout * time.Second)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-task.Done():
|
|
|
|
|
ctx.Log("setReadyToConsume - task done called!")
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case <-tm:
|
|
|
|
|
ctx.Logf("setReadyToConsume setting readyToConsume")
|
|
|
|
|
readyToConsume.Store(true)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
|
|
|
|
return func(ctx flow.Context, s common.Service) error {
|
|
|
|
|
return multierr.Combine(
|
|
|
|
|
s.AddTopicEventHandler(&common.Subscription{
|
|
|
|
|
PubsubName: pubsubName,
|
|
|
|
|
Topic: topicName,
|
|
|
|
|
Route: "/orders",
|
|
|
|
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
|
|
|
|
if !readyToConsume.Load() {
|
|
|
|
|
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure on purpose...", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
|
|
|
time.Sleep(5 * time.Second)
|
|
|
|
|
return true, fmt.Errorf("failure on purpose")
|
|
|
|
|
} else {
|
|
|
|
|
messagesWatcher.Observe(e.Data)
|
|
|
|
|
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
}),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
// prepare the messages
|
|
|
|
|
messages := make([]string, failedMessagesNum)
|
|
|
|
|
for i := range messages {
|
|
|
|
|
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add the messages as expectations to the watchers
|
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
|
|
|
messageWatcher.ExpectStrings(messages...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get the sidecar (dapr) client
|
|
|
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
|
|
|
|
|
|
// publish messages
|
|
|
|
|
ctx.Logf("SNSSQSMessageDisableDeleteOnRetryLimit - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
|
|
|
|
|
|
var publishOptions dapr.PublishEventOption
|
|
|
|
|
|
|
|
|
|
if metadata != nil {
|
|
|
|
|
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, message := range messages {
|
|
|
|
|
ctx.Logf("Publishing: %q", message)
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
if publishOptions != nil {
|
|
|
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
|
|
|
|
} else {
|
|
|
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
|
|
|
}
|
|
|
|
|
require.NoError(ctx, err, "SNSSQSMessageDisableDeleteOnRetryLimit - error publishing message")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
// assert for messages
|
|
|
|
|
for _, m := range messageWatchers {
|
|
|
|
|
if !m.Assert(ctx, 3*timeout) {
|
|
|
|
|
ctx.Errorf("SNSSQSMessageDisableDeleteOnRetryLimit - message assertion failed for watcher: %#v\n", m)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
prefix := "SNSSQSMessageDisableDeleteOnRetryLimit-"
|
|
|
|
|
setReadyToConsumeApp := prefix + "setReadyToConsumeApp"
|
|
|
|
|
subApp := prefix + "subscriberApp"
|
|
|
|
|
subAppSideCar := prefix + sidecarName2
|
|
|
|
|
readyToConsumeTimeout := time.Duration(20) //seconds
|
|
|
|
|
|
|
|
|
|
flow.New(t, "SNSSQSMessageDisableDeleteOnRetryLimit Verify data with an optional parameters `disableDeleteOnRetryLimit` takes affect").
|
|
|
|
|
StepAsync(setReadyToConsumeApp, &task,
|
|
|
|
|
setReadyToConsume(readyToConsumeTimeout, nil)).
|
|
|
|
|
|
|
|
|
|
// Run subscriberApplication - will fail to process messages
|
|
|
|
|
Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4),
|
|
|
|
|
subscriberApplication(subApp, disableDeleteOnRetryLimitTopicIn, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with "PUBSUB_AWS_SNSSQS_TOPIC_NODRT"
|
|
|
|
|
Step(sidecar.Run(subAppSideCar,
|
|
|
|
|
embedded.WithComponentsPath("./components/disableDeleteOnRetryLimit"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+4),
|
|
|
|
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+4),
|
|
|
|
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+4),
|
|
|
|
|
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+4),
|
|
|
|
|
componentRuntimeOptions(),
|
|
|
|
|
)).
|
|
|
|
|
Step("publish messages to disableDeleteOnRetryLimitTopicIn ==> "+disableDeleteOnRetryLimitTopicIn, publishMessages(nil, subAppSideCar, disableDeleteOnRetryLimitTopicIn, consumerGroup1)).
|
|
|
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
|
|
|
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
|
|
|
Step("reset", flow.Reset(consumerGroup1)).
|
|
|
|
|
Run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func componentRuntimeOptions() []runtime.Option {
|
|
|
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
|
|
|
|
|
|
|