components-contrib/tests/certification/pubsub/aws/snssqs/snssqs_test.go

1723 lines
64 KiB
Go

/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snssqs_test
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"os"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
// Pub-Sub.
pubsub_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs"
secretstore_env "github.com/dapr/components-contrib/secretstores/local/env"
pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub"
secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores"
"github.com/dapr/dapr/pkg/config/protocol"
"github.com/dapr/kit/logger"
"github.com/dapr/dapr/pkg/runtime"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
// Certification testing runnables
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
)
const (
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
appID1 = "app-1"
appID2 = "app-2"
numMessages = 10
appPort = 8000
portOffset = 2
messageKey = "partitionKey"
pubsubName = "snssqs-cert-tests"
topicActiveName = "certification-pubsub-topic-active"
topicPassiveName = "certification-pubsub-topic-passive"
topicToBeCreated = "certification-topic-per-test-run"
topicDefaultName = "certification-topic-default"
partition0 = "partition-0"
partition1 = "partition-1"
)
var (
region = "us-east-1" // replaced with env var AWS_REGION
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
disableDeleteOnRetryLimitTopicIn = "disableDeleteOnRetryLimitTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_NODRT
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
disableDeleteOnRetryLimitQueueName = "disableDeleteOnRetryLimitQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_NODRT
)
// The following Queue names must match
// the values of the "consumerID" metadata properties
// found inside each of the components/*/pubsub.yaml files
// The names will be passed in with Env Vars like:
//
// PUBSUB_AWS_SNSSQS_QUEUE_*
// PUBSUB_AWS_SNSSQS_TOPIC_*
var queues = []string{}
var topics = []string{
topicActiveName,
topicPassiveName,
topicToBeCreated,
topicDefaultName,
}
func init() {
qn := os.Getenv("AWS_REGION")
if qn != "" {
region = qn
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_1")
if qn != "" {
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_2")
if qn != "" {
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_FIFO")
if qn != "" {
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLIN")
if qn != "" {
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_DLOUT")
if qn != "" {
deadLetterQueueName = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_NODRT")
if qn != "" {
disableDeleteOnRetryLimitQueueName = qn
queues = append(queues, qn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
if qn != "" {
existingTopic = qn
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_MVT")
if qn != "" {
messageVisibilityTimeoutTopic = qn
topics = append(topics, messageVisibilityTimeoutTopic)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_FIFO")
if qn != "" {
fifoTopic = qn
topics = append(topics, fifoTopic)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_DLIN")
if qn != "" {
deadLetterTopicIn = qn
topics = append(topics, deadLetterTopicIn)
}
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_NODRT")
if qn != "" {
disableDeleteOnRetryLimitTopicIn = qn
topics = append(topics, disableDeleteOnRetryLimitTopicIn)
}
}
func TestAWSSNSSQSCertificationTests(t *testing.T) {
defer teardown(t)
t.Run("SNSSQSBasic", func(t *testing.T) {
SNSSQSBasic(t)
})
t.Run("SNSSQSMultipleSubsSameConsumerIDs", func(t *testing.T) {
SNSSQSMultipleSubsSameConsumerIDs(t)
})
t.Run("SNSSQSMultipleSubsDifferentConsumerIDs", func(t *testing.T) {
SNSSQSMultipleSubsDifferentConsumerIDs(t)
})
t.Run("SNSSQSMultiplePubSubsDifferentConsumerIDs", func(t *testing.T) {
SNSSQSMultiplePubSubsDifferentConsumerIDs(t)
})
t.Run("SNSSQSExistingQueueAndTopic", func(t *testing.T) {
SNSSQSExistingQueueAndTopic(t)
})
t.Run("SNSSQSExistingQueueNonexistingTopic", func(t *testing.T) {
SNSSQSExistingQueueNonexistingTopic(t)
})
t.Run("SNSSQSNonexistingTopic", func(t *testing.T) {
SNSSQSNonexistingTopic(t)
})
t.Run("SNSSQSEntityManagement", func(t *testing.T) {
SNSSQSEntityManagement(t)
})
t.Run("SNSSQSMessageVisibilityTimeout", func(t *testing.T) {
SNSSQSMessageVisibilityTimeout(t)
})
t.Run("SNSSQSFIFOMessages", func(t *testing.T) {
SNSSQSFIFOMessages(t)
})
t.Run("SNSSQSMessageDeadLetter", func(t *testing.T) {
SNSSQSMessageDeadLetter(t)
})
t.Run("SNSSQSMessageDisableDeleteOnRetryLimit", func(t *testing.T) {
SNSSQSMessageDisableDeleteOnRetryLimit(t)
})
}
// Verify with single publisher / single subscriber
func SNSSQSBasic(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSBasic - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSBasic - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS Verify with single publisher / single subscriber").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
)...,
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
Step("publish messages to active topic ==> "+topicActiveName, publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1, consumerGroup2)).
Step("publish messages to passive topic ==> "+topicPassiveName, publishMessages(nil, sidecarName1, topicPassiveName)).
Step("verify if app1 has recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app2 has recevied messages published to passive topic", assertMessages(10*time.Second, consumerGroup2)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
// Verify with single publisher / multiple subscribers with same consumerID
func SNSSQSMultipleSubsSameConsumerIDs(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
metadata1 := map[string]string{
messageKey: partition1,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMultipleSubsSameConsumerIDs - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSMultipleSubsSameConsumerIDs - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS certification - single publisher and multiple subscribers with same consumer IDs").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
)...,
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
Step("publish messages to ==> "+topicActiveName, publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup2)).
Step("publish messages to ==> "+topicActiveName, publishMessages(metadata1, sidecarName2, topicActiveName, consumerGroup2)).
Step("verify if app1, app2 together have recevied messages published to topic1", assertMessages(10*time.Second, consumerGroup2)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
// Verify with single publisher / multiple subscribers with different consumerIDs
func SNSSQSMultipleSubsDifferentConsumerIDs(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMultipleSubsDifferentConsumerIDs - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSMultipleSubsDifferentConsumerIDs - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS certification - single publisher and multiple subscribers with different consumer IDs").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
)...,
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// RRun the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
Step("publish messages to ==>"+topicActiveName, publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
Step("verify if app1, app2 together have recevied messages published to topic1", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
// Verify with multiple publishers / multiple subscribers with different consumerIDs
func SNSSQSMultiplePubSubsDifferentConsumerIDs(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
metadata1 := map[string]string{
messageKey: partition1,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMultiplePubSubsDifferentConsumerIDs - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSMultiplePubSubsDifferentConsumerIDs - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS certification - multiple publishers and multiple subscribers with different consumer IDs").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
)...,
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
Step("publish messages to ==> "+topicActiveName, publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
Step("publish messages to ==> "+topicActiveName, publishMessages(metadata1, sidecarName2, topicActiveName, consumerGroup2)).
Step("verify if app1, app2 together have recevied messages published to topic1", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app1, app2 together have recevied messages published to topic1", assertMessages(10*time.Second, consumerGroup2)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
// Verify data with a topic that does not exist
func SNSSQSNonexistingTopic(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSNonexistingTopic - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSNonexistingTopic - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS certification - non-existing topic").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
subscriberApplication(appID1, topicToBeCreated, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*3)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*3)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*3)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset*3)),
)...,
)).
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicToBeCreated), publishMessages(metadata, sidecarName1, topicToBeCreated, consumerGroup1)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has recevied messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
Run()
}
// Verify data with an existing Queue and existing Topic
func SNSSQSExistingQueueAndTopic(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSExistingQueueAndTopic - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSExistingQueueAndTopic - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS certification - Existing Queue Existing Topic").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
subscriberApplication(appID1, existingTopic, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_3"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/existing_queue"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*3)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*3)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*3)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset*3)),
)...,
)).
Step(fmt.Sprintf("publish messages to existingTopic: %s", existingTopic), publishMessages(metadata, sidecarName1, existingTopic, consumerGroup1)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has recevied messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
Run()
}
// Verify data with an existing Queue with a topic that does not exist
func SNSSQSExistingQueueNonexistingTopic(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSExistingQueueNonexistingTopic - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 25*timeout) {
ctx.Errorf("SNSSQSExistingQueueNonexistingTopic - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
flow.New(t, "SNSSQS certification - Existing Queue None Existing Topic").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
subscriberApplication(appID1, topicToBeCreated, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_3"
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/existing_queue"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*3)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*3)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*3)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset*3)),
)...,
)).
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicToBeCreated), publishMessages(metadata, sidecarName1, topicToBeCreated, consumerGroup1)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has recevied messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
Run()
}
// Verify with an optional parameter `disableEntityManagement` set to true
func SNSSQSEntityManagement(t *testing.T) {
// TODO: Modify it to looks for component init error in the sidecar itself.
consumerGroup1 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
// Error is expected as the topic does not exist
require.Error(ctx, err, "SNSSQSEntityManagement - error publishing message")
}
return nil
}
}
flow.New(t, "SNSSQS certification - entity management disabled").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
// Run the Dapr sidecar
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/entity_mgmt"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
Step(fmt.Sprintf("publish messages to topicDefault: %s", topicDefaultName), publishMessages(metadata, sidecarName1, topicDefaultName, consumerGroup1)).
Run()
}
// Verify data with an optional parameter `messageVisibilityTimeout` takes affect
func SNSSQSMessageVisibilityTimeout(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
metadata := map[string]string{}
latch := make(chan struct{})
busyTime := 10 * time.Second
messagesToSend := 1
waitForLatch := func(appID string, ctx flow.Context, l chan struct{}) error {
ctx.Logf("waitForLatch %s is waiting...\n", appID)
<-l
ctx.Logf("waitForLatch %s ready to continue!\n", appID)
return nil
}
subscriberMVTimeoutApp := func(appID string, topicName string, messagesWatcher *watcher.Watcher, l chan struct{}) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeout App: %q topicName: %q\n", appID, topicName)
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeout App: %q got message: %s busy for %v\n", appID, e.Data, busyTime)
time.Sleep(busyTime)
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeout App: %q - notifying next Subscriber to continue...\n", appID)
l <- struct{}{}
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeoutApp: %q - sent busy for %v\n", appID, busyTime)
time.Sleep(busyTime)
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeoutApp: %q - done!\n", appID)
return false, nil
}),
)
}
}
notExpectingMessagesSubscriberApp := func(appID string, topicName string, messagesWatcher *watcher.Watcher, l chan struct{}) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
ctx.Logf("SNSSQSMessageVisibilityTimeout.notExpectingMessagesSubscriberApp App: %q topicName: %q waiting for notification to start receiving messages\n", appID, topicName)
return multierr.Combine(
waitForLatch(appID, ctx, l),
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("SNSSQSMessageVisibilityTimeout.notExpectingMessagesSubscriberApp App: %q got unexpected message: %s\n", appID, e.Data)
messagesWatcher.FailIfNotExpected(t, e.Data)
return false, nil
}),
)
}
}
testTtlPublishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, messagesToSend)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
ctx.Logf("####### get the sidecar (dapr) client sidecarName: %s, topicName: %s", sidecarName, topicName)
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("####### Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMessageVisibilityTimeout - error publishing message")
}
return nil
}
}
connectToSideCar := func(sidecarName string) flow.Runnable {
return func(ctx flow.Context) error {
ctx.Logf("####### connect to sidecar (dapr) client sidecarName: %s and exit", sidecarName)
// get the sidecar (dapr) client
sidecar.GetClient(ctx, sidecarName)
return nil
}
}
flow.New(t, "SNSSQS certification - messageVisibilityTimeout attribute receive").
// App1 should receive the messages, wait some time (busy), notify App2, wait some time (busy),
// and finish processing message.
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
subscriberMVTimeoutApp(appID1, messageVisibilityTimeoutTopic, consumerGroup1, latch))).
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/message_visibility_timeout"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
Step(fmt.Sprintf("publish messages to messageVisibilityTimeoutTopic: %s", messageVisibilityTimeoutTopic),
testTtlPublishMessages(metadata, sidecarName1, messageVisibilityTimeoutTopic, consumerGroup1)).
// App2 waits for App1 notification to subscribe to message
// After subscribing, if App2 receives any messages, the messageVisibilityTimeoutTopic is either too short,
// or code is broken somehow
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset+2),
notExpectingMessagesSubscriberApp(appID2, messageVisibilityTimeoutTopic, consumerGroup1, latch))).
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/message_visibility_timeout"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset+2)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset+2)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset+2)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset+2)),
)...,
)).
Step("No messages will be sent here",
connectToSideCar(sidecarName2)).
Step("wait", flow.Sleep(10*time.Second)).
Run()
}
// Verify data with an optional parameters `fifo` and `fifoMessageGroupID` takes affect (SNSSQSFIFOMessages)
func SNSSQSFIFOMessages(t *testing.T) {
consumerGroup1 := watcher.NewOrdered()
// prepare the messages
maxFifoMessages := 20
fifoMessages := make([]string, maxFifoMessages)
for i := 0; i < maxFifoMessages; i++ {
fifoMessages[i] = fmt.Sprintf("m%d", i+1)
}
consumerGroup1.ExpectStrings(fifoMessages...)
// There are multiple publishers so the following
// generator will supply messages to each one in order
msgCh := make(chan string)
go func(mc chan string) {
for _, m := range fifoMessages {
mc <- m
}
close(mc)
}(msgCh)
doNothingApp := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return nil
}
}
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("SNSSQSFIFOMessages.subscriberApplication: Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %#v", appID, e.PubsubName, e.Topic, e.ID, e.Data)
messagesWatcher.Observe(e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, mw *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("SNSSQSFIFOMessages Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for message := range msgCh {
ctx.Logf("SNSSQSFIFOMessages Publishing: sidecarName: %s, topicName: %s - %q", sidecarName, topicName, message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSFIFOMessages - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 10*timeout) {
ctx.Errorf("SNSSQSFIFOMessages - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
pub1 := "publisher1"
sc1 := pub1 + "_sidecar"
pub2 := "publisher2"
sc2 := pub2 + "_sidecar"
sub := "subscriber"
subsc := sub + "_sidecar"
flow.New(t, "SNSSQSFIFOMessages Verify FIFO with multiple publishers and single subscriber receiving messages in order").
// Subscriber
Step(app.Run(sub, fmt.Sprintf(":%d", appPort),
subscriberApplication(sub, fifoTopic, consumerGroup1))).
Step(sidecar.Run(subsc,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/fifo"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
)...,
)).
Step("wait", flow.Sleep(5*time.Second)).
// Publisher 1
Step(app.Run(pub1, fmt.Sprintf(":%d", appPort+portOffset+2),
doNothingApp(pub1, fifoTopic, consumerGroup1))).
Step(sidecar.Run(sc1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/fifo"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset+2)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset+2)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset+2)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset+2)),
)...,
)).
Step("publish messages to topic ==> "+fifoTopic, publishMessages(nil, sc1, fifoTopic, consumerGroup1)).
// Publisher 2
Step(app.Run(pub2, fmt.Sprintf(":%d", appPort+portOffset+4),
doNothingApp(pub2, fifoTopic, consumerGroup1))).
Step(sidecar.Run(sc2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/fifo"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset+4)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset+4)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset+4)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset+4)),
)...,
)).
Step("publish messages to topic ==> "+fifoTopic, publishMessages(nil, sc2, fifoTopic, consumerGroup1)).
Step("wait", flow.Sleep(10*time.Second)).
Step("verify if recevied ordered messages published to active topic", assertMessages(1*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
Run()
}
// Verify data with an optional parameters `sqsDeadLettersQueueName`, `messageRetryLimit`, and `messageReceiveLimit` takes affect
func SNSSQSMessageDeadLetter(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
deadLetterConsumerGroup := watcher.NewUnordered()
failedMessagesNum := 1
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure on purpose...", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return true, fmt.Errorf("failure on purpose")
}),
)
}
}
var task flow.AsyncTask
deadLetterReceiverApplication := func(deadLetterQueueName string, msgTimeout time.Duration, messagesWatcher *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
counter := 1
qm := NewQueueManager()
for {
select {
case <-task.Done():
ctx.Log("deadLetterReceiverApplication - task done called!")
return nil
case <-time.After(msgTimeout * time.Second):
ctx.Logf("deadLetterReceiverApplication - timeout waiting for messages from (%q)", deadLetterQueueName)
return fmt.Errorf("deadLetterReceiverApplication - timeout waiting for messages from (%q)", deadLetterQueueName)
case <-t.C:
numMsgs, err := qm.GetMessages(deadLetterQueueName, true, func(m *DataMessage) error {
ctx.Logf("deadLetterReceiverApplication - received message counter(%d) (%v)\n", counter, m.Data)
messagesWatcher.Observe(m.Data)
return nil
})
if err != nil {
ctx.Logf("deadLetterReceiverApplication - failed to get messages from (%q) counter(%d) %v - trying again\n", deadLetterQueueName, counter, err)
continue
}
if numMsgs == 0 {
// No messages yet, try again
ctx.Logf("deadLetterReceiverApplication - no messages yet from (%q) counter(%d) - trying again\n", deadLetterQueueName, counter)
continue
}
if counter >= failedMessagesNum {
ctx.Logf("deadLetterReceiverApplication - received all expected (%d) failed message!\n", failedMessagesNum)
return nil
}
counter += numMsgs
}
}
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, failedMessagesNum)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("SNSSQSMessageDeadLetter - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMessageDeadLetter - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 3*timeout) {
ctx.Errorf("SNSSQSMessageDeadLetter - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
prefix := "SNSSQSMessageDeadLetter-"
deadletterApp := prefix + "deadLetterReceiverApp"
subApp := prefix + "subscriberApp"
subAppSideCar := prefix + sidecarName2
msgTimeout := time.Duration(60) //seconds
flow.New(t, "SNSSQSMessageDeadLetter Verify with single publisher / single subscriber and DeadLetter").
// Run deadLetterReceiverApplication - should receive messages from dead letter queue
// "PUBSUB_AWS_SNSSQS_QUEUE_DLOUT"
StepAsync(deadletterApp, &task,
deadLetterReceiverApplication(deadLetterQueueName, msgTimeout, deadLetterConsumerGroup)).
// Run subscriberApplication - will fail to process messages
Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4),
subscriberApplication(subApp, deadLetterTopicIn, consumerGroup1))).
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_TOPIC_DLIN"
Step(sidecar.Run(subAppSideCar,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/deadletter"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset+4)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset+4)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset+4)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset+4)),
)...,
)).
Step("publish messages to deadLetterTopicIn ==> "+deadLetterTopicIn, publishMessages(nil, subAppSideCar, deadLetterTopicIn, deadLetterConsumerGroup)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app2 has deadletterMessageNum recevied messages send to dead letter queue", assertMessages(10*time.Second, deadLetterConsumerGroup)).
Step("reset", flow.Reset(consumerGroup1, deadLetterConsumerGroup)).
Run()
}
// Verify data with an optional parameters `disableDeleteOnRetryLimit` takes affect
func SNSSQSMessageDisableDeleteOnRetryLimit(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
failedMessagesNum := 1
readyToConsume := atomic.Bool{}
var task flow.AsyncTask
setReadyToConsume := func(consumeTimeout time.Duration, _ *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
tm := time.After(consumeTimeout * time.Second)
for {
select {
case <-task.Done():
ctx.Log("setReadyToConsume - task done called!")
return nil
case <-tm:
ctx.Logf("setReadyToConsume setting readyToConsume")
readyToConsume.Store(true)
}
}
return nil
}
}
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if !readyToConsume.Load() {
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s - causing failure on purpose...", appID, e.PubsubName, e.Topic, e.ID, e.Data)
time.Sleep(5 * time.Second)
return true, fmt.Errorf("failure on purpose")
} else {
messagesWatcher.Observe(e.Data)
ctx.Logf("subscriberApplication - Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, failedMessagesNum)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("SNSSQSMessageDisableDeleteOnRetryLimit - Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "SNSSQSMessageDisableDeleteOnRetryLimit - error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
if !m.Assert(ctx, 3*timeout) {
ctx.Errorf("SNSSQSMessageDisableDeleteOnRetryLimit - message assertion failed for watcher: %#v\n", m)
}
}
return nil
}
}
prefix := "SNSSQSMessageDisableDeleteOnRetryLimit-"
setReadyToConsumeApp := prefix + "setReadyToConsumeApp"
subApp := prefix + "subscriberApp"
subAppSideCar := prefix + sidecarName2
readyToConsumeTimeout := time.Duration(20) //seconds
flow.New(t, "SNSSQSMessageDisableDeleteOnRetryLimit Verify data with an optional parameters `disableDeleteOnRetryLimit` takes affect").
StepAsync(setReadyToConsumeApp, &task,
setReadyToConsume(readyToConsumeTimeout, nil)).
// Run subscriberApplication - will fail to process messages
Step(app.Run(subApp, fmt.Sprintf(":%d", appPort+portOffset+4),
subscriberApplication(subApp, disableDeleteOnRetryLimitTopicIn, consumerGroup1))).
// Run the Dapr sidecar with "PUBSUB_AWS_SNSSQS_TOPIC_NODRT"
Step(sidecar.Run(subAppSideCar,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/disableDeleteOnRetryLimit"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset+4)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset+4)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset+4)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset+4)),
)...,
)).
Step("publish messages to disableDeleteOnRetryLimitTopicIn ==> "+disableDeleteOnRetryLimitTopicIn, publishMessages(nil, subAppSideCar, disableDeleteOnRetryLimitTopicIn, consumerGroup1)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
Run()
}
func componentRuntimeOptions() []embedded.Option {
log := logger.NewLogger("dapr.components")
pubsubRegistry := pubsub_loader.NewRegistry()
pubsubRegistry.Logger = log
pubsubRegistry.RegisterComponent(pubsub_snssqs.NewSnsSqs, "snssqs")
secretstoreRegistry := secretstores_loader.NewRegistry()
secretstoreRegistry.Logger = log
secretstoreRegistry.RegisterComponent(secretstore_env.NewEnvSecretStore, "local.env")
return []embedded.Option{
embedded.WithPubSubs(pubsubRegistry),
embedded.WithSecretStores(secretstoreRegistry),
}
}
func teardown(t *testing.T) {
t.Logf("AWS SNS/SQS CertificationTests teardown...")
//Dapr runtime automatically creates the following queues, topics
//so here they get deleted.
if err := deleteQueues(queues); err != nil {
t.Log(err)
}
if err := deleteTopics(topics, region); err != nil {
t.Log(err)
}
t.Logf("AWS SNS/SQS CertificationTests teardown...done!")
}