|
|
|
|
@ -64,11 +64,18 @@ const (
|
|
|
|
|
partition1 = "partition-1"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// The following Queue names must match
|
|
|
|
|
// the values of the "consumerID" metadata properties
|
|
|
|
|
// found inside each of the components/*/pubsub.yaml files
|
|
|
|
|
// The names will be passed in with Env Vars like:
|
|
|
|
|
//
|
|
|
|
|
// PUBSUB_AWS_SNSSQS_TOPIC_*
|
|
|
|
|
var queues = []string{
|
|
|
|
|
"snssqscerttest1",
|
|
|
|
|
"snscerttest1",
|
|
|
|
|
"snssqscerttest2",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -81,6 +88,10 @@ func init() {
|
|
|
|
|
if qn != "" {
|
|
|
|
|
queues[1] = qn
|
|
|
|
|
}
|
|
|
|
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
|
|
|
|
|
if qn != "" {
|
|
|
|
|
existingTopic = qn
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
|
|
|
|
@ -102,6 +113,14 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
|
|
|
|
SNSSQSMultiplePubSubsDifferentConsumerIDs(t)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("SNSSQSExistingQueueAndTopic", func(t *testing.T) {
|
|
|
|
|
SNSSQSExistingQueueAndTopic(t)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("TestSNSSQSExistingQueueNonexistingTopic", func(t *testing.T) {
|
|
|
|
|
TestSNSSQSExistingQueueNonexistingTopic(t)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("SNSSQSNonexistingTopic", func(t *testing.T) {
|
|
|
|
|
SNSSQSNonexistingTopic(t)
|
|
|
|
|
})
|
|
|
|
|
@ -205,7 +224,7 @@ func SNSSQSBasic(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
|
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest1"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_one"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
|
|
|
|
@ -218,7 +237,7 @@ func SNSSQSBasic(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
|
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest2"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
|
|
|
|
|
Step(sidecar.Run(sidecarName2,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_two"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
|
|
|
|
@ -333,7 +352,7 @@ func SNSSQSMultipleSubsSameConsumerIDs(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
|
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest1"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_one"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
|
|
|
|
@ -346,7 +365,7 @@ func SNSSQSMultipleSubsSameConsumerIDs(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
|
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest2"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
|
|
|
|
|
Step(sidecar.Run(sidecarName2,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_two"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
|
|
|
|
@ -456,7 +475,7 @@ func SNSSQSMultipleSubsDifferentConsumerIDs(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
|
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest1"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_one"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
|
|
|
|
@ -469,7 +488,7 @@ func SNSSQSMultipleSubsDifferentConsumerIDs(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
|
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
|
|
|
|
|
|
// RRun the Dapr sidecar with ConsumerID "snssqscerttest2"
|
|
|
|
|
// RRun the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
|
|
|
|
|
Step(sidecar.Run(sidecarName2,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_two"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
|
|
|
|
@ -582,7 +601,7 @@ func SNSSQSMultiplePubSubsDifferentConsumerIDs(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
|
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest1"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_one"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
|
|
|
|
@ -595,7 +614,7 @@ func SNSSQSMultiplePubSubsDifferentConsumerIDs(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
|
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest2"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_2"
|
|
|
|
|
Step(sidecar.Run(sidecarName2,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_two"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
|
|
|
|
@ -705,7 +724,7 @@ func SNSSQSNonexistingTopic(t *testing.T) {
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
|
|
|
|
|
subscriberApplication(appID1, topicToBeCreated, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "snssqscerttest1"
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_1"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/consumer_one"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*3),
|
|
|
|
|
@ -720,6 +739,222 @@ func SNSSQSNonexistingTopic(t *testing.T) {
|
|
|
|
|
Run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify data with an existing Queue and existing Topic
|
|
|
|
|
func SNSSQSExistingQueueAndTopic(t *testing.T) {
|
|
|
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
|
|
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
|
|
|
metadata := map[string]string{
|
|
|
|
|
messageKey: partition0,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscriber of the given topic
|
|
|
|
|
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
|
|
|
|
return func(ctx flow.Context, s common.Service) error {
|
|
|
|
|
// Simulate periodic errors.
|
|
|
|
|
sim := simulate.PeriodicError(ctx, 100)
|
|
|
|
|
// Setup the /orders event handler.
|
|
|
|
|
return multierr.Combine(
|
|
|
|
|
s.AddTopicEventHandler(&common.Subscription{
|
|
|
|
|
PubsubName: pubsubName,
|
|
|
|
|
Topic: topicName,
|
|
|
|
|
Route: "/orders",
|
|
|
|
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
|
|
|
|
if err := sim(); err != nil {
|
|
|
|
|
return true, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Track/Observe the data of the event.
|
|
|
|
|
messagesWatcher.Observe(e.Data)
|
|
|
|
|
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
|
|
|
return false, nil
|
|
|
|
|
}),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
// prepare the messages
|
|
|
|
|
messages := make([]string, numMessages)
|
|
|
|
|
for i := range messages {
|
|
|
|
|
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add the messages as expectations to the watchers
|
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
|
|
|
messageWatcher.ExpectStrings(messages...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get the sidecar (dapr) client
|
|
|
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
|
|
|
|
|
|
// publish messages
|
|
|
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
|
|
|
|
|
|
var publishOptions dapr.PublishEventOption
|
|
|
|
|
|
|
|
|
|
if metadata != nil {
|
|
|
|
|
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, message := range messages {
|
|
|
|
|
ctx.Logf("Publishing: %q", message)
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
if publishOptions != nil {
|
|
|
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
|
|
|
|
} else {
|
|
|
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
|
|
|
}
|
|
|
|
|
require.NoError(ctx, err, "SNSSQSExistingQueueAndTopic - error publishing message")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
// assert for messages
|
|
|
|
|
for _, m := range messageWatchers {
|
|
|
|
|
if !m.Assert(ctx, 25*timeout) {
|
|
|
|
|
ctx.Errorf("SNSSQSExistingQueueAndTopic - message assersion failed for watcher: %#v\n", m)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
flow.New(t, "SNSSQS certification - Existing Queue Existing Topic").
|
|
|
|
|
|
|
|
|
|
// Run subscriberApplication app1
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
|
|
|
|
|
subscriberApplication(appID1, existingTopic, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_3"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/existing_queue"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*3),
|
|
|
|
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*3),
|
|
|
|
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*3),
|
|
|
|
|
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset*3),
|
|
|
|
|
componentRuntimeOptions(),
|
|
|
|
|
)).
|
|
|
|
|
Step(fmt.Sprintf("publish messages to existingTopic: %s", existingTopic), publishMessages(metadata, sidecarName1, existingTopic, consumerGroup1)).
|
|
|
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
|
|
|
Step("verify if app1 has recevied messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
|
|
|
Run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify data with an existing Queue with a topic that does not exist
|
|
|
|
|
func TestSNSSQSExistingQueueNonexistingTopic(t *testing.T) {
|
|
|
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
|
|
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
|
|
|
metadata := map[string]string{
|
|
|
|
|
messageKey: partition0,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscriber of the given topic
|
|
|
|
|
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
|
|
|
|
return func(ctx flow.Context, s common.Service) error {
|
|
|
|
|
// Simulate periodic errors.
|
|
|
|
|
sim := simulate.PeriodicError(ctx, 100)
|
|
|
|
|
// Setup the /orders event handler.
|
|
|
|
|
return multierr.Combine(
|
|
|
|
|
s.AddTopicEventHandler(&common.Subscription{
|
|
|
|
|
PubsubName: pubsubName,
|
|
|
|
|
Topic: topicName,
|
|
|
|
|
Route: "/orders",
|
|
|
|
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
|
|
|
|
if err := sim(); err != nil {
|
|
|
|
|
return true, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Track/Observe the data of the event.
|
|
|
|
|
messagesWatcher.Observe(e.Data)
|
|
|
|
|
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
|
|
|
return false, nil
|
|
|
|
|
}),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
// prepare the messages
|
|
|
|
|
messages := make([]string, numMessages)
|
|
|
|
|
for i := range messages {
|
|
|
|
|
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add the messages as expectations to the watchers
|
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
|
|
|
messageWatcher.ExpectStrings(messages...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get the sidecar (dapr) client
|
|
|
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
|
|
|
|
|
|
// publish messages
|
|
|
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
|
|
|
|
|
|
var publishOptions dapr.PublishEventOption
|
|
|
|
|
|
|
|
|
|
if metadata != nil {
|
|
|
|
|
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, message := range messages {
|
|
|
|
|
ctx.Logf("Publishing: %q", message)
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
if publishOptions != nil {
|
|
|
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
|
|
|
|
} else {
|
|
|
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
|
|
|
}
|
|
|
|
|
require.NoError(ctx, err, "TestSNSSQSExistingQueueNonexistingTopic - error publishing message")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
|
|
|
return func(ctx flow.Context) error {
|
|
|
|
|
// assert for messages
|
|
|
|
|
for _, m := range messageWatchers {
|
|
|
|
|
if !m.Assert(ctx, 25*timeout) {
|
|
|
|
|
ctx.Errorf("TestSNSSQSExistingQueueNonexistingTopic - message assersion failed for watcher: %#v\n", m)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
flow.New(t, "SNSSQS certification - Existing Queue None Existing Topic").
|
|
|
|
|
|
|
|
|
|
// Run subscriberApplication app1
|
|
|
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
|
|
|
|
|
subscriberApplication(appID1, topicToBeCreated, consumerGroup1))).
|
|
|
|
|
|
|
|
|
|
// Run the Dapr sidecar with ConsumerID "PUBSUB_AWS_SNSSQS_QUEUE_3"
|
|
|
|
|
Step(sidecar.Run(sidecarName1,
|
|
|
|
|
embedded.WithComponentsPath("./components/existing_queue"),
|
|
|
|
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*3),
|
|
|
|
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*3),
|
|
|
|
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*3),
|
|
|
|
|
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset*3),
|
|
|
|
|
componentRuntimeOptions(),
|
|
|
|
|
)).
|
|
|
|
|
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicToBeCreated), publishMessages(metadata, sidecarName1, topicToBeCreated, consumerGroup1)).
|
|
|
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
|
|
|
Step("verify if app1 has recevied messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
|
|
|
Run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify with an optional parameter `disableEntityManagement` set to true
|
|
|
|
|
func SNSSQSEntityManagement(t *testing.T) {
|
|
|
|
|
// TODO: Modify it to looks for component init error in the sidecar itself.
|
|
|
|
|
@ -932,6 +1167,8 @@ func componentRuntimeOptions() []runtime.Option {
|
|
|
|
|
|
|
|
|
|
func teardown(t *testing.T) {
|
|
|
|
|
t.Logf("AWS SNS/SQS CertificationTests teardown...")
|
|
|
|
|
// Dapr runtime automatically creates the following queues
|
|
|
|
|
// so here they get deleted.
|
|
|
|
|
if err := deleteQueues(queues); err != nil {
|
|
|
|
|
t.Log(err)
|
|
|
|
|
}
|
|
|
|
|
|