Merge branch 'master' into eventhubs-track2
This commit is contained in:
commit
1219568cc1
|
@ -308,6 +308,13 @@ jobs:
|
||||||
PUBSUB_AWS_SNSSQS_TOPIC_MVT="sqssnscerttest-tp-mvt-${{env.UNIQUE_ID}}"
|
PUBSUB_AWS_SNSSQS_TOPIC_MVT="sqssnscerttest-tp-mvt-${{env.UNIQUE_ID}}"
|
||||||
echo "PUBSUB_AWS_SNSSQS_TOPIC_MVT=$PUBSUB_AWS_SNSSQS_TOPIC_MVT" >> $GITHUB_ENV
|
echo "PUBSUB_AWS_SNSSQS_TOPIC_MVT=$PUBSUB_AWS_SNSSQS_TOPIC_MVT" >> $GITHUB_ENV
|
||||||
|
|
||||||
|
PUBSUB_AWS_SNSSQS_QUEUE_FIFO="sqssnscerttest-q-fifo-${{env.UNIQUE_ID}}.fifo"
|
||||||
|
echo "PUBSUB_AWS_SNSSQS_QUEUE_FIFO=$PUBSUB_AWS_SNSSQS_QUEUE_FIFO" >> $GITHUB_ENV
|
||||||
|
PUBSUB_AWS_SNSSQS_TOPIC_FIFO="sqssnscerttest-t-fifo-${{env.UNIQUE_ID}}.fifo"
|
||||||
|
echo "PUBSUB_AWS_SNSSQS_TOPIC_FIFO=$PUBSUB_AWS_SNSSQS_TOPIC_FIFO" >> $GITHUB_ENV
|
||||||
|
PUBSUB_AWS_SNSSQS_FIFO_GROUP_ID="sqssnscerttest-q-fifo-${{env.UNIQUE_ID}}"
|
||||||
|
echo "PUBSUB_AWS_SNSSQS_FIFO_GROUP_ID=$PUBSUB_AWS_SNSSQS_FIFO_GROUP_ID" >> $GITHUB_ENV
|
||||||
|
|
||||||
AWS_REGION="us-east-1"
|
AWS_REGION="us-east-1"
|
||||||
echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV
|
echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,11 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs
|
||||||
- Subscriber 1 reeives message, notifies subscriber 2 and sumlates being busy for time shorter than messageVisibilityTimeout seconds
|
- Subscriber 1 reeives message, notifies subscriber 2 and sumlates being busy for time shorter than messageVisibilityTimeout seconds
|
||||||
- Subscriber 2 receives go ahead and subscribes to 1 topic
|
- Subscriber 2 receives go ahead and subscribes to 1 topic
|
||||||
- Subscriber 2 must not receive message
|
- Subscriber 2 must not receive message
|
||||||
|
- Verify data with an optional parameters `fifo` and `fifoMessageGroupID` takes affect (SNSSQSFIFOMessages)
|
||||||
|
- Run dapr application with 2 publisher and 1 subscriber
|
||||||
|
- Publishers publishe to 1 topic
|
||||||
|
- Subscriber 1 subscribes to 1 topic
|
||||||
|
- Message are expected to arrive in order
|
||||||
|
|
||||||
### Running the tests
|
### Running the tests
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
apiVersion: dapr.io/v1alpha1
|
||||||
|
kind: Component
|
||||||
|
metadata:
|
||||||
|
name: envvar-secret-store
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
type: secretstores.local.env
|
||||||
|
version: v1
|
||||||
|
metadata:
|
|
@ -0,0 +1,37 @@
|
||||||
|
apiVersion: dapr.io/v1alpha1
|
||||||
|
kind: Component
|
||||||
|
metadata:
|
||||||
|
name: snssqs-cert-tests
|
||||||
|
spec:
|
||||||
|
type: pubsub.snssqs
|
||||||
|
version: v1
|
||||||
|
metadata:
|
||||||
|
- name: accessKey
|
||||||
|
secretKeyRef:
|
||||||
|
name: AWS_ACCESS_KEY_ID
|
||||||
|
key: AWS_ACCESS_KEY_ID
|
||||||
|
- name: secretKey
|
||||||
|
secretKeyRef:
|
||||||
|
name: AWS_SECRET_ACCESS_KEY
|
||||||
|
key: AWS_SECRET_ACCESS_KEY
|
||||||
|
- name: region
|
||||||
|
secretKeyRef:
|
||||||
|
name: AWS_REGION
|
||||||
|
key: AWS_REGION
|
||||||
|
- name: consumerID
|
||||||
|
secretKeyRef:
|
||||||
|
name: PUBSUB_AWS_SNSSQS_QUEUE_FIFO
|
||||||
|
key: PUBSUB_AWS_SNSSQS_QUEUE_FIFO
|
||||||
|
- name: fifoMessageGroupID
|
||||||
|
secretKeyRef:
|
||||||
|
name: PUBSUB_AWS_SNSSQS_FIFO_GROUP_ID
|
||||||
|
key: PUBSUB_AWS_SNSSQS_FIFO_GROUP_ID
|
||||||
|
- name: fifo
|
||||||
|
value: "true"
|
||||||
|
- name: messageMaxNumber
|
||||||
|
value: "10"
|
||||||
|
- name: concurrencyMode
|
||||||
|
value: "single"
|
||||||
|
|
||||||
|
auth:
|
||||||
|
secretstore: envvar-secret-store
|
|
@ -35,7 +35,7 @@ func deleteQueues(queues []string) error {
|
||||||
svc := sqsService()
|
svc := sqsService()
|
||||||
for _, queue := range queues {
|
for _, queue := range queues {
|
||||||
if err := deleteQueue(svc, queue); err != nil {
|
if err := deleteQueue(svc, queue); err != nil {
|
||||||
return fmt.Errorf("error deleting the queue URL: %q err:%v", queue, err)
|
fmt.Printf("error deleting the queue URL: %q err:%v", queue, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -16,6 +16,7 @@ package snssqs_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -68,6 +69,7 @@ var (
|
||||||
region = "us-east-1" // replaced with env var AWS_REGION
|
region = "us-east-1" // replaced with env var AWS_REGION
|
||||||
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
||||||
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
|
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
|
||||||
|
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
|
||||||
)
|
)
|
||||||
|
|
||||||
// The following Queue names must match
|
// The following Queue names must match
|
||||||
|
@ -80,6 +82,7 @@ var (
|
||||||
var queues = []string{
|
var queues = []string{
|
||||||
"snscerttest1",
|
"snscerttest1",
|
||||||
"snssqscerttest2",
|
"snssqscerttest2",
|
||||||
|
"snssqscerttestfifo",
|
||||||
}
|
}
|
||||||
|
|
||||||
var topics = []string{
|
var topics = []string{
|
||||||
|
@ -103,6 +106,11 @@ func init() {
|
||||||
if qn != "" {
|
if qn != "" {
|
||||||
queues[1] = qn
|
queues[1] = qn
|
||||||
}
|
}
|
||||||
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_FIFO")
|
||||||
|
if qn != "" {
|
||||||
|
queues[2] = qn
|
||||||
|
}
|
||||||
|
|
||||||
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_3")
|
||||||
if qn != "" {
|
if qn != "" {
|
||||||
existingTopic = qn
|
existingTopic = qn
|
||||||
|
@ -110,6 +118,12 @@ func init() {
|
||||||
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_MVT")
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_MVT")
|
||||||
if qn != "" {
|
if qn != "" {
|
||||||
messageVisibilityTimeoutTopic = qn
|
messageVisibilityTimeoutTopic = qn
|
||||||
|
topics = append(topics, messageVisibilityTimeoutTopic)
|
||||||
|
}
|
||||||
|
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_FIFO")
|
||||||
|
if qn != "" {
|
||||||
|
fifoTopic = qn
|
||||||
|
topics = append(topics, fifoTopic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,6 +165,10 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
||||||
t.Run("SNSSQSMessageVisibilityTimeout", func(t *testing.T) {
|
t.Run("SNSSQSMessageVisibilityTimeout", func(t *testing.T) {
|
||||||
SNSSQSMessageVisibilityTimeout(t)
|
SNSSQSMessageVisibilityTimeout(t)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("SNSSQSFIFOMessages", func(t *testing.T) {
|
||||||
|
SNSSQSFIFOMessages(t)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify with single publisher / single subscriber
|
// Verify with single publisher / single subscriber
|
||||||
|
@ -1197,6 +1215,145 @@ func SNSSQSMessageVisibilityTimeout(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify data with an optional parameters `fifo` and `fifoMessageGroupID` takes affect (SNSSQSFIFOMessages)
|
||||||
|
func SNSSQSFIFOMessages(t *testing.T) {
|
||||||
|
consumerGroup1 := watcher.NewOrdered()
|
||||||
|
|
||||||
|
// prepare the messages
|
||||||
|
maxFifoMessages := 20
|
||||||
|
fifoMessages := make([]string, maxFifoMessages)
|
||||||
|
for i := 0; i < maxFifoMessages; i++ {
|
||||||
|
fifoMessages[i] = fmt.Sprintf("m%d", i+1)
|
||||||
|
}
|
||||||
|
consumerGroup1.ExpectStrings(fifoMessages...)
|
||||||
|
|
||||||
|
// There are multiple publishers so the following
|
||||||
|
// generator will supply messages to each one in order
|
||||||
|
msgCh := make(chan string)
|
||||||
|
go func(mc chan string) {
|
||||||
|
for _, m := range fifoMessages {
|
||||||
|
mc <- m
|
||||||
|
}
|
||||||
|
close(mc)
|
||||||
|
}(msgCh)
|
||||||
|
|
||||||
|
doNothingApp := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
||||||
|
return func(ctx flow.Context, s common.Service) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
||||||
|
return func(ctx flow.Context, s common.Service) error {
|
||||||
|
return multierr.Combine(
|
||||||
|
s.AddTopicEventHandler(&common.Subscription{
|
||||||
|
PubsubName: pubsubName,
|
||||||
|
Topic: topicName,
|
||||||
|
Route: "/orders",
|
||||||
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||||
|
ctx.Logf("SNSSQSFIFOMessages.subscriberApplication: Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %#v", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
||||||
|
messagesWatcher.Observe(e.Data)
|
||||||
|
return false, nil
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, mw *watcher.Watcher) flow.Runnable {
|
||||||
|
return func(ctx flow.Context) error {
|
||||||
|
|
||||||
|
// get the sidecar (dapr) client
|
||||||
|
client := sidecar.GetClient(ctx, sidecarName)
|
||||||
|
|
||||||
|
// publish messages
|
||||||
|
ctx.Logf("SNSSQSFIFOMessages Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
||||||
|
|
||||||
|
var publishOptions dapr.PublishEventOption
|
||||||
|
|
||||||
|
if metadata != nil {
|
||||||
|
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
for message := range msgCh {
|
||||||
|
ctx.Logf("SNSSQSFIFOMessages Publishing: sidecarName: %s, topicName: %s - %q", sidecarName, topicName, message)
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if publishOptions != nil {
|
||||||
|
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
||||||
|
} else {
|
||||||
|
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
||||||
|
}
|
||||||
|
require.NoError(ctx, err, "SNSSQSFIFOMessages - error publishing message")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||||
|
return func(ctx flow.Context) error {
|
||||||
|
// assert for messages
|
||||||
|
for _, m := range messageWatchers {
|
||||||
|
if !m.Assert(ctx, 10*timeout) {
|
||||||
|
ctx.Errorf("SNSSQSFIFOMessages - message assersion failed for watcher: %#v\n", m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub1 := "publisher1"
|
||||||
|
sc1 := pub1 + "_sidecar"
|
||||||
|
pub2 := "publisher2"
|
||||||
|
sc2 := pub2 + "_sidecar"
|
||||||
|
sub := "subscriber"
|
||||||
|
subsc := sub + "_sidecar"
|
||||||
|
|
||||||
|
flow.New(t, "SNSSQSFIFOMessages Verify FIFO with multiple publishers and single subscriber receiving messages in order").
|
||||||
|
|
||||||
|
// Subscriber
|
||||||
|
Step(app.Run(sub, fmt.Sprintf(":%d", appPort),
|
||||||
|
subscriberApplication(sub, fifoTopic, consumerGroup1))).
|
||||||
|
Step(sidecar.Run(subsc,
|
||||||
|
embedded.WithComponentsPath("./components/fifo"),
|
||||||
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||||
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
|
||||||
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
|
||||||
|
componentRuntimeOptions(),
|
||||||
|
)).
|
||||||
|
Step("wait", flow.Sleep(5*time.Second)).
|
||||||
|
|
||||||
|
// Publisher 1
|
||||||
|
Step(app.Run(pub1, fmt.Sprintf(":%d", appPort+portOffset+2),
|
||||||
|
doNothingApp(pub1, fifoTopic, consumerGroup1))).
|
||||||
|
Step(sidecar.Run(sc1,
|
||||||
|
embedded.WithComponentsPath("./components/fifo"),
|
||||||
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+2),
|
||||||
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+2),
|
||||||
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+2),
|
||||||
|
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+2),
|
||||||
|
componentRuntimeOptions(),
|
||||||
|
)).
|
||||||
|
Step("publish messages to topic ==> "+fifoTopic, publishMessages(nil, sc1, fifoTopic, consumerGroup1)).
|
||||||
|
|
||||||
|
// Publisher 2
|
||||||
|
Step(app.Run(pub2, fmt.Sprintf(":%d", appPort+portOffset+4),
|
||||||
|
doNothingApp(pub2, fifoTopic, consumerGroup1))).
|
||||||
|
Step(sidecar.Run(sc2,
|
||||||
|
embedded.WithComponentsPath("./components/fifo"),
|
||||||
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+4),
|
||||||
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+4),
|
||||||
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+4),
|
||||||
|
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+4),
|
||||||
|
componentRuntimeOptions(),
|
||||||
|
)).
|
||||||
|
Step("publish messages to topic ==> "+fifoTopic, publishMessages(nil, sc2, fifoTopic, consumerGroup1)).
|
||||||
|
Step("wait", flow.Sleep(10*time.Second)).
|
||||||
|
Step("verify if recevied ordered messages published to active topic", assertMessages(1*time.Second, consumerGroup1)).
|
||||||
|
Step("reset", flow.Reset(consumerGroup1)).
|
||||||
|
Run()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func componentRuntimeOptions() []runtime.Option {
|
func componentRuntimeOptions() []runtime.Option {
|
||||||
log := logger.NewLogger("dapr.components")
|
log := logger.NewLogger("dapr.components")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue