Merge pull request #2451 from robertojrojas/snssqs-cert-test-1339-message-visibility
AWS SNS/SQS PubSub Certification Tests - Message Visibility Timeout
This commit is contained in:
commit
d6670f9e88
|
@ -31,6 +31,8 @@ provider "aws" {
|
|||
}
|
||||
|
||||
// ###### RESOURCES ########
|
||||
|
||||
// ## Existing Topic ##
|
||||
resource "aws_sns_topic" "existingTopic" {
|
||||
name = "sqssnscerttest-t3-${var.UNIQUE_ID}"
|
||||
tags = {
|
||||
|
@ -78,11 +80,71 @@ resource "aws_sqs_queue_policy" "existingQueue_policy" {
|
|||
POLICY
|
||||
}
|
||||
|
||||
// ## Message Visibility Timeout Topic ##
|
||||
resource "aws_sns_topic" "messageVisibilityTimeoutTopic" {
|
||||
name = "sqssnscerttest-tp-mvt-${var.UNIQUE_ID}"
|
||||
tags = {
|
||||
dapr-topic-name = "sqssnscerttest-tp-mvt-${var.UNIQUE_ID}"
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_sqs_queue" "messageVisibilityTimeoutQueue" {
|
||||
name = "sqssnscerttest-q-mvt-${var.UNIQUE_ID}"
|
||||
tags = {
|
||||
dapr-queue-name = "sqssnscerttest-q-mvt-${var.UNIQUE_ID}"
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_sns_topic_subscription" "messageVisibilityTimeoutTopic_messageVisibilityTimeoutQueue" {
|
||||
topic_arn = aws_sns_topic.messageVisibilityTimeoutTopic.arn
|
||||
protocol = "sqs"
|
||||
endpoint = aws_sqs_queue.messageVisibilityTimeoutQueue.arn
|
||||
}
|
||||
|
||||
resource "aws_sqs_queue_policy" "messageVisibilityTimeoutQueue_policy" {
|
||||
queue_url = "${aws_sqs_queue.messageVisibilityTimeoutQueue.id}"
|
||||
|
||||
policy = <<POLICY
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Id": "sqspolicy",
|
||||
"Statement": [{
|
||||
"Sid": "Allow-SNS-SendMessage",
|
||||
"Effect": "Allow",
|
||||
"Principal": {
|
||||
"Service": "sns.amazonaws.com"
|
||||
},
|
||||
"Action": "sqs:SendMessage",
|
||||
"Resource": "${aws_sqs_queue.messageVisibilityTimeoutQueue.arn}",
|
||||
"Condition": {
|
||||
"ArnEquals": {
|
||||
"aws:SourceArn": [
|
||||
"${aws_sns_topic.messageVisibilityTimeoutTopic.arn}"
|
||||
]
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
POLICY
|
||||
}
|
||||
|
||||
// ###### OUTPUT VARIABLES ########
|
||||
output "existingQueue" {
|
||||
value = aws_sqs_queue.existingQueue.name
|
||||
}
|
||||
|
||||
output "existingTopic" {
|
||||
value = aws_sns_topic.existingTopic.name
|
||||
}
|
||||
output "existingTopic_existingQueue_subscription" {
|
||||
value = aws_sns_topic_subscription.existingTopic_existingQueue.id
|
||||
}
|
||||
|
||||
output "messageVisibilityTimeoutQueue" {
|
||||
value = aws_sqs_queue.messageVisibilityTimeoutQueue.name
|
||||
}
|
||||
output "messageVisibilityTimeoutTopic" {
|
||||
value = aws_sns_topic.messageVisibilityTimeoutTopic.name
|
||||
}
|
||||
output "messageVisibilityTimeoutTopic_messageVisibilityTimeoutQueue_subscription" {
|
||||
value = aws_sns_topic_subscription.messageVisibilityTimeoutTopic_messageVisibilityTimeoutQueue.id
|
||||
}
|
|
@ -293,12 +293,20 @@ jobs:
|
|||
run: |
|
||||
PUBSUB_AWS_SNSSQS_QUEUE_1="sqssnscerttest-q1-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_QUEUE_1=$PUBSUB_AWS_SNSSQS_QUEUE_1" >> $GITHUB_ENV
|
||||
|
||||
PUBSUB_AWS_SNSSQS_QUEUE_2="sqssnscerttest-q2-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_QUEUE_2=$PUBSUB_AWS_SNSSQS_QUEUE_2" >> $GITHUB_ENV
|
||||
|
||||
PUBSUB_AWS_SNSSQS_QUEUE_3="sqssnscerttest-q3-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_QUEUE_3=$PUBSUB_AWS_SNSSQS_QUEUE_3" >> $GITHUB_ENV
|
||||
PUBSUB_AWS_SNSSQS_TOPIC_3="sqssnscerttest-t3-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_TOPIC_3=$PUBSUB_AWS_SNSSQS_TOPIC_3" >> $GITHUB_ENV
|
||||
|
||||
PUBSUB_AWS_SNSSQS_QUEUE_MVT="sqssnscerttest-q-mvt-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_QUEUE_MVT=$PUBSUB_AWS_SNSSQS_QUEUE_MVT" >> $GITHUB_ENV
|
||||
PUBSUB_AWS_SNSSQS_TOPIC_MVT="sqssnscerttest-tp-mvt-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_TOPIC_MVT=$PUBSUB_AWS_SNSSQS_TOPIC_MVT" >> $GITHUB_ENV
|
||||
|
||||
AWS_REGION="us-east-1"
|
||||
echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV
|
||||
|
||||
|
|
|
@ -3,56 +3,59 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs
|
|||
|
||||
## Test Plan
|
||||
### Certification Tests
|
||||
- Verify with single publisher / single subscriber (TestSNSSQSBasic)
|
||||
- Verify with single publisher / single subscriber (SNSSQSBasic)
|
||||
- Run dapr application with 1 publisher and 1 subscriber
|
||||
- Publisher publishes to 2 topics
|
||||
- Subscriber is subscribed to 1 topic
|
||||
- Simulate periodic errors and verify that the component retires on error
|
||||
- Verify that all expected messages were received
|
||||
- Verify that subscriber does not receive messages from the non-subscribed topic
|
||||
- Verify with single publisher / multiple subscribers with same consumerID (TestSNSSQSMultipleSubsSameConsumerIDs)
|
||||
- Verify with single publisher / multiple subscribers with same consumerID (SNSSQSMultipleSubsSameConsumerIDs)
|
||||
- Run dapr application with 1 publisher and 2 subscribers
|
||||
- Publisher publishes to 1 topic
|
||||
- Subscriber is subscribed to 1 topic
|
||||
- Simulate periodic errors and verify that the component retires on error
|
||||
- Verify that all expected messages were received
|
||||
- Verify with single publisher / multiple subscribers with different consumerIDs (TestSNSSQSMultipleSubsDifferentConsumerIDs)
|
||||
- Verify with single publisher / multiple subscribers with different consumerIDs (SNSSQSMultipleSubsDifferentConsumerIDs)
|
||||
- Run dapr application with 1 publisher and 2 subscribers
|
||||
- Publisher publishes to 1 topic
|
||||
- Subscriber is subscribed to 1 topic
|
||||
- Simulate periodic errors and verify that the component retires on error
|
||||
- Verify that all expected messages were received
|
||||
- Verify with multiple publishers / multiple subscribers with different consumerIDs (TestSNSSQSMultiplePubSubsDifferentConsumerIDs)
|
||||
- Verify with multiple publishers / multiple subscribers with different consumerIDs (SNSSQSMultiplePubSubsDifferentConsumerIDs)
|
||||
- Run dapr application with 2 publishers and 2 subscribers
|
||||
- Publisher publishes to 1 topic
|
||||
- Subscriber is subscribed to 1 topic
|
||||
- Simulate periodic errors and verify that the component retires on error
|
||||
- Verify that all expected messages were received
|
||||
- Verify data with an existing Queue and existing Topic (TestSNSSQSExistingQueue)
|
||||
- Verify data with an existing Queue and existing Topic (SNSSQSExistingQueue)
|
||||
- Run dapr application with 1 publisher and 1 subscriber
|
||||
- Verify the creation of service bus
|
||||
- Send messages to the service created
|
||||
- Verify that subscriber received all the messages
|
||||
- Verify data with an existing Queue with a topic that does not exist (TestSNSSQSExistingQueueNonexistingTopic)
|
||||
- Verify data with an existing Queue with a topic that does not exist (SNSSQSExistingQueueNonexistingTopic)
|
||||
- Run dapr application with 1 publisher and 1 subscriber
|
||||
- Verify the creation of service bus
|
||||
- Send messages to the service created
|
||||
- Verify that subscriber received all the messages
|
||||
- Verify data with a topic that does not exist (TestSNSSQSNonexistingTopic)
|
||||
- Verify data with a topic that does not exist (SNSSQSNonexistingTopic)
|
||||
- Run dapr application with 1 publisher and 1 subscriber
|
||||
- Verify the creation of service bus
|
||||
- Send messages to the service created
|
||||
- Verify that subscriber received all the messages
|
||||
- Verify with an optional parameter `disableEntityManagement` set to true (TestSNSSQSEntityManagement)
|
||||
- Verify with an optional parameter `disableEntityManagement` set to true (SNSSQSEntityManagement)
|
||||
- Run dapr application with 1 publisher
|
||||
- Publisher tries to publish to 1 topic that is not present
|
||||
- Verify that the topic and subscriptions do not get created
|
||||
- Verify that the error is returned saying that the topic not present when publishing
|
||||
- Verify data with an optional parameter `defaultMessageTimeToLiveInSec` set (TestSNSSQSDefaultTtl)
|
||||
- Run dapr application with 1 publisher and 1 subscriber
|
||||
- Subscriber is subscribed to 1 topic
|
||||
- Publisher publishes to 1 topic, wait double the TTL seconds
|
||||
- Verify the message is deleted/expired
|
||||
- Verify data with an optional parameter `messageVisibilityTimeout` takes affect (SNSSQSMessageVisibilityTimeout)
|
||||
- Run dapr application with 1 publisher and 2 subscriber
|
||||
- Subscriber 1 subscribes to 1 topic
|
||||
- Publisher publishes to 1 topic
|
||||
- Subscriber 2 waits for Subscriber 1 to be notify before subscribing to 1 topic
|
||||
- Subscriber 1 reeives message, notifies subscriber 2 and sumlates being busy for time shorter than messageVisibilityTimeout seconds
|
||||
- Subscriber 2 receives go ahead and subscribes to 1 topic
|
||||
- Subscriber 2 must not receive message
|
||||
|
||||
### Running the tests
|
||||
|
||||
|
|
|
@ -20,9 +20,9 @@ spec:
|
|||
key: AWS_REGION
|
||||
- name: consumerID
|
||||
secretKeyRef:
|
||||
name: PUBSUB_AWS_SNSSQS_QUEUE_1
|
||||
key: PUBSUB_AWS_SNSSQS_QUEUE_1
|
||||
- name: defaultMessageTimeToLiveInSec
|
||||
value: 10
|
||||
name: PUBSUB_AWS_SNSSQS_QUEUE_MVT
|
||||
key: PUBSUB_AWS_SNSSQS_QUEUE_MVT
|
||||
- name: messageVisibilityTimeout
|
||||
value: 60
|
||||
auth:
|
||||
secretstore: envvar-secret-store
|
|
@ -16,9 +16,19 @@ package snssqs_test
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
// AWS SDK
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/sns"
|
||||
"github.com/aws/aws-sdk-go/service/sns/snsiface"
|
||||
"github.com/aws/aws-sdk-go/service/sqs"
|
||||
"github.com/aws/aws-sdk-go/service/sts"
|
||||
"github.com/aws/aws-sdk-go/service/sts/stsiface"
|
||||
)
|
||||
|
||||
var (
|
||||
partition string = "aws"
|
||||
serviceName string = "sns"
|
||||
)
|
||||
|
||||
func deleteQueues(queues []string) error {
|
||||
|
@ -31,16 +41,6 @@ func deleteQueues(queues []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func sqsService() *sqs.SQS {
|
||||
sess := session.Must(
|
||||
session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
SharedConfigState: session.SharedConfigEnable,
|
||||
},
|
||||
))
|
||||
return sqs.New(sess)
|
||||
}
|
||||
|
||||
func deleteQueue(svc *sqs.SQS, queue string) error {
|
||||
queueUrlOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
|
||||
QueueName: &queue,
|
||||
|
@ -55,3 +55,88 @@ func deleteQueue(svc *sqs.SQS, queue string) error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func deleteTopics(topics []string, region string) error {
|
||||
sess := session.Must(
|
||||
session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
SharedConfigState: session.SharedConfigEnable,
|
||||
},
|
||||
))
|
||||
svc := sns.New(sess)
|
||||
id, err := getIdentity(sts.New(sess))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, topic := range topics {
|
||||
topicArn := buildARN(partition, serviceName, topic, region, id)
|
||||
fmt.Printf("Getting subscriptions for topicArn: %s\n", topicArn)
|
||||
if subout, err := svc.ListSubscriptionsByTopic(&sns.ListSubscriptionsByTopicInput{
|
||||
TopicArn: aws.String(topicArn),
|
||||
}); err == nil {
|
||||
for _, sub := range subout.Subscriptions {
|
||||
if err := unsubscribeFromTopic(svc, *sub.SubscriptionArn); err != nil {
|
||||
fmt.Printf("error unsubscribing arn: %q err:%v\n", *sub.SubscriptionArn, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("error getting subscription list topic: %q err:%v\n", topic, err)
|
||||
}
|
||||
|
||||
if err := deleteTopic(svc, topicArn); err != nil {
|
||||
fmt.Printf("error deleting the topic: %q err:%v\n", topic, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteTopic(svc snsiface.SNSAPI, topic string) error {
|
||||
_, err := svc.DeleteTopic(&sns.DeleteTopicInput{
|
||||
TopicArn: aws.String(topic),
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func unsubscribeFromTopic(svc snsiface.SNSAPI, subscription string) error {
|
||||
_, err := svc.Unsubscribe(&sns.UnsubscribeInput{
|
||||
SubscriptionArn: aws.String(subscription),
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func sqsService() *sqs.SQS {
|
||||
sess := session.Must(
|
||||
session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
SharedConfigState: session.SharedConfigEnable,
|
||||
},
|
||||
))
|
||||
return sqs.New(sess)
|
||||
}
|
||||
|
||||
func getIdentity(svc stsiface.STSAPI) (*sts.GetCallerIdentityOutput, error) {
|
||||
input := &sts.GetCallerIdentityInput{}
|
||||
result, err := svc.GetCallerIdentity(input)
|
||||
if err != nil {
|
||||
if aerr, ok := err.(awserr.Error); ok {
|
||||
switch aerr.Code() {
|
||||
default:
|
||||
return nil, fmt.Errorf(aerr.Error())
|
||||
}
|
||||
} else {
|
||||
// Print the error, cast err to awserr.Error to get the Code and
|
||||
// Message from an error.
|
||||
return nil, fmt.Errorf(aerr.Error())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func buildARN(partition, serviceName, entityName, region string, id *sts.GetCallerIdentityOutput) string {
|
||||
return fmt.Sprintf("arn:%s:%s:%s:%s:%s", partition, serviceName, region, *id.Account, entityName)
|
||||
}
|
||||
|
|
|
@ -65,7 +65,9 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
||||
region = "us-east-1" // replaced with env var AWS_REGION
|
||||
existingTopic = "existingTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_3
|
||||
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
|
||||
)
|
||||
|
||||
// The following Queue names must match
|
||||
|
@ -73,14 +75,27 @@ var (
|
|||
// found inside each of the components/*/pubsub.yaml files
|
||||
// The names will be passed in with Env Vars like:
|
||||
//
|
||||
// PUBSUB_AWS_SNSSQS_QUEUE_*
|
||||
// PUBSUB_AWS_SNSSQS_TOPIC_*
|
||||
var queues = []string{
|
||||
"snscerttest1",
|
||||
"snssqscerttest2",
|
||||
}
|
||||
|
||||
var topics = []string{
|
||||
topicActiveName,
|
||||
topicPassiveName,
|
||||
topicToBeCreated,
|
||||
topicDefaultName,
|
||||
}
|
||||
|
||||
func init() {
|
||||
qn := os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_1")
|
||||
qn := os.Getenv("AWS_REGION")
|
||||
if qn != "" {
|
||||
region = qn
|
||||
}
|
||||
|
||||
qn = os.Getenv("PUBSUB_AWS_SNSSQS_QUEUE_1")
|
||||
if qn != "" {
|
||||
queues[0] = qn
|
||||
}
|
||||
|
@ -92,6 +107,10 @@ func init() {
|
|||
if qn != "" {
|
||||
existingTopic = qn
|
||||
}
|
||||
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_MVT")
|
||||
if qn != "" {
|
||||
messageVisibilityTimeoutTopic = qn
|
||||
}
|
||||
}
|
||||
|
||||
func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
||||
|
@ -117,8 +136,8 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
|||
SNSSQSExistingQueueAndTopic(t)
|
||||
})
|
||||
|
||||
t.Run("TestSNSSQSExistingQueueNonexistingTopic", func(t *testing.T) {
|
||||
TestSNSSQSExistingQueueNonexistingTopic(t)
|
||||
t.Run("SNSSQSExistingQueueNonexistingTopic", func(t *testing.T) {
|
||||
SNSSQSExistingQueueNonexistingTopic(t)
|
||||
})
|
||||
|
||||
t.Run("SNSSQSNonexistingTopic", func(t *testing.T) {
|
||||
|
@ -129,8 +148,8 @@ func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
|||
SNSSQSEntityManagement(t)
|
||||
})
|
||||
|
||||
t.Run("SNSSQSDefaultTtl", func(t *testing.T) {
|
||||
SNSSQSDefaultTtl(t)
|
||||
t.Run("SNSSQSMessageVisibilityTimeout", func(t *testing.T) {
|
||||
SNSSQSMessageVisibilityTimeout(t)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -848,7 +867,7 @@ func SNSSQSExistingQueueAndTopic(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify data with an existing Queue with a topic that does not exist
|
||||
func TestSNSSQSExistingQueueNonexistingTopic(t *testing.T) {
|
||||
func SNSSQSExistingQueueNonexistingTopic(t *testing.T) {
|
||||
consumerGroup1 := watcher.NewUnordered()
|
||||
|
||||
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
||||
|
@ -915,7 +934,7 @@ func TestSNSSQSExistingQueueNonexistingTopic(t *testing.T) {
|
|||
} else {
|
||||
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
||||
}
|
||||
require.NoError(ctx, err, "TestSNSSQSExistingQueueNonexistingTopic - error publishing message")
|
||||
require.NoError(ctx, err, "SNSSQSExistingQueueNonexistingTopic - error publishing message")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -926,7 +945,7 @@ func TestSNSSQSExistingQueueNonexistingTopic(t *testing.T) {
|
|||
// assert for messages
|
||||
for _, m := range messageWatchers {
|
||||
if !m.Assert(ctx, 25*timeout) {
|
||||
ctx.Errorf("TestSNSSQSExistingQueueNonexistingTopic - message assersion failed for watcher: %#v\n", m)
|
||||
ctx.Errorf("SNSSQSExistingQueueNonexistingTopic - message assersion failed for watcher: %#v\n", m)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1043,26 +1062,53 @@ func SNSSQSEntityManagement(t *testing.T) {
|
|||
Run()
|
||||
}
|
||||
|
||||
// Verify data with an optional parameter `defaultMessageTimeToLiveInSec` set
|
||||
func SNSSQSDefaultTtl(t *testing.T) {
|
||||
// Verify data with an optional parameter `messageVisibilityTimeout` takes affect
|
||||
func SNSSQSMessageVisibilityTimeout(t *testing.T) {
|
||||
consumerGroup1 := watcher.NewUnordered()
|
||||
|
||||
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
||||
metadata := map[string]string{
|
||||
messageKey: partition0,
|
||||
metadata := map[string]string{}
|
||||
latch := make(chan struct{})
|
||||
busyTime := 10 * time.Second
|
||||
messagesToSend := 1
|
||||
waitForLatch := func(appID string, ctx flow.Context, l chan struct{}) error {
|
||||
ctx.Logf("waitForLatch %s is waiting...\n", appID)
|
||||
<-l
|
||||
ctx.Logf("waitForLatch %s ready to continue!\n", appID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// subscriber of the given topic
|
||||
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
||||
subscriberMVTimeoutApp := func(appID string, topicName string, messagesWatcher *watcher.Watcher, l chan struct{}) app.SetupFn {
|
||||
return func(ctx flow.Context, s common.Service) error {
|
||||
// Setup the /orders event handler.
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeout App: %q topicName: %q\n", appID, topicName)
|
||||
return multierr.Combine(
|
||||
s.AddTopicEventHandler(&common.Subscription{
|
||||
PubsubName: pubsubName,
|
||||
Topic: topicName,
|
||||
Route: "/orders",
|
||||
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||
ctx.Logf("Got message: %s", e.Data)
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeout App: %q got message: %s busy for %v\n", appID, e.Data, busyTime)
|
||||
time.Sleep(busyTime)
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeout App: %q - notifying next Subscriber to continue...\n", appID)
|
||||
l <- struct{}{}
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeoutApp: %q - sent busy for %v\n", appID, busyTime)
|
||||
time.Sleep(busyTime)
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.subscriberApplicationMVTimeoutApp: %q - done!\n", appID)
|
||||
return false, nil
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
notExpectingMessagesSubscriberApp := func(appID string, topicName string, messagesWatcher *watcher.Watcher, l chan struct{}) app.SetupFn {
|
||||
return func(ctx flow.Context, s common.Service) error {
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.notExpectingMessagesSubscriberApp App: %q topicName: %q waiting for notification to start receiving messages\n", appID, topicName)
|
||||
return multierr.Combine(
|
||||
waitForLatch(appID, ctx, l),
|
||||
s.AddTopicEventHandler(&common.Subscription{
|
||||
PubsubName: pubsubName,
|
||||
Topic: topicName,
|
||||
Route: "/orders",
|
||||
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||
ctx.Logf("SNSSQSMessageVisibilityTimeout.notExpectingMessagesSubscriberApp App: %q got unexpected message: %s\n", appID, e.Data)
|
||||
messagesWatcher.FailIfNotExpected(t, e.Data)
|
||||
return false, nil
|
||||
}),
|
||||
|
@ -1073,16 +1119,17 @@ func SNSSQSDefaultTtl(t *testing.T) {
|
|||
testTtlPublishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
// prepare the messages
|
||||
messages := make([]string, numMessages)
|
||||
messages := make([]string, messagesToSend)
|
||||
for i := range messages {
|
||||
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
||||
}
|
||||
|
||||
ctx.Logf("####### get the sidecar (dapr) client sidecarName: %s, topicName: %s", sidecarName, topicName)
|
||||
// get the sidecar (dapr) client
|
||||
client := sidecar.GetClient(ctx, sidecarName)
|
||||
|
||||
// publish messages
|
||||
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
||||
ctx.Logf("####### Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
||||
|
||||
var publishOptions dapr.PublishEventOption
|
||||
|
||||
|
@ -1099,53 +1146,55 @@ func SNSSQSDefaultTtl(t *testing.T) {
|
|||
} else {
|
||||
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
||||
}
|
||||
require.NoError(ctx, err, "SNSSQSDefaultTtl - error publishing message")
|
||||
require.NoError(ctx, err, "SNSSQSMessageVisibilityTimeout - error publishing message")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||
connectToSideCar := func(sidecarName string) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
// assert for messages
|
||||
for _, m := range messageWatchers {
|
||||
if !m.Assert(ctx, 25*timeout) {
|
||||
ctx.Errorf("SNSSQSDefaultTtl - message assersion failed for watcher: %#v\n", m)
|
||||
}
|
||||
}
|
||||
|
||||
ctx.Logf("####### connect to sidecar (dapr) client sidecarName: %s and exit", sidecarName)
|
||||
// get the sidecar (dapr) client
|
||||
sidecar.GetClient(ctx, sidecarName)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
flow.New(t, "SNSSQS certification - default ttl attribute").
|
||||
|
||||
// Run subscriberApplication app1
|
||||
flow.New(t, "SNSSQS certification - messageVisibilityTimeout attribute receive").
|
||||
// App1 should receive the messages, wait some time (busy), notify App2, wait some time (busy),
|
||||
// and finish processing message.
|
||||
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
|
||||
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
||||
|
||||
// Run the Dapr sidecar with the ttl
|
||||
Step(sidecar.Run("initalSidecar",
|
||||
embedded.WithComponentsPath("./components/default_ttl"),
|
||||
embedded.WithoutApp(),
|
||||
subscriberMVTimeoutApp(appID1, messageVisibilityTimeoutTopic, consumerGroup1, latch))).
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/message_visibility_timeout"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicActiveName), testTtlPublishMessages(metadata, "initalSidecar", topicActiveName, consumerGroup1)).
|
||||
Step("stop initial sidecar", sidecar.Stop("initialSidecar")).
|
||||
Step("wait", flow.Sleep(20*time.Second)).
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/default_ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*2),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*2),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset*2),
|
||||
Step(fmt.Sprintf("publish messages to messageVisibilityTimeoutTopic: %s", messageVisibilityTimeoutTopic),
|
||||
testTtlPublishMessages(metadata, sidecarName1, messageVisibilityTimeoutTopic, consumerGroup1)).
|
||||
|
||||
// App2 waits for App1 notification to subscribe to message
|
||||
// After subscribing, if App2 receives any messages, the messageVisibilityTimeoutTopic is either too short,
|
||||
// or code is broken somehow
|
||||
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset+2),
|
||||
notExpectingMessagesSubscriberApp(appID2, messageVisibilityTimeoutTopic, consumerGroup1, latch))).
|
||||
Step(sidecar.Run(sidecarName2,
|
||||
embedded.WithComponentsPath("./components/message_visibility_timeout"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset+2),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset+2),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset+2),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+2),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("verify if app6 has recevied messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
|
||||
Step("No messages will be sent here",
|
||||
connectToSideCar(sidecarName2)).
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
Run()
|
||||
|
||||
}
|
||||
|
||||
func componentRuntimeOptions() []runtime.Option {
|
||||
|
@ -1167,10 +1216,15 @@ func componentRuntimeOptions() []runtime.Option {
|
|||
|
||||
func teardown(t *testing.T) {
|
||||
t.Logf("AWS SNS/SQS CertificationTests teardown...")
|
||||
// Dapr runtime automatically creates the following queues
|
||||
// so here they get deleted.
|
||||
//Dapr runtime automatically creates the following queues, topics
|
||||
//so here they get deleted.
|
||||
if err := deleteQueues(queues); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
if err := deleteTopics(topics, region); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
t.Logf("AWS SNS/SQS CertificationTests teardown...done!")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue