updates
Signed-off-by: Roberto J Rojas <robertojrojas@gmail.com>
This commit is contained in:
parent
c7730b007f
commit
2b5bd43567
|
@ -315,9 +315,7 @@ jobs:
|
|||
|
||||
PUBSUB_AWS_SNSSQS_TOPIC_DLIN="sqssnscerttest-dlt-in-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_TOPIC_DLIN=$PUBSUB_AWS_SNSSQS_TOPIC_DLIN" >> $GITHUB_ENV
|
||||
PUBSUB_AWS_SNSSQS_TOPIC_DLOUT="sqssnscerttest-dlt-out-${{env.UNIQUE_ID}}"
|
||||
echo "PUBSUB_AWS_SNSSQS_TOPIC_DLOUT=$PUBSUB_AWS_SNSSQS_TOPIC_DLOUT" >> $GITHUB_ENV
|
||||
|
||||
|
||||
PUBSUB_AWS_SNSSQS_QUEUE_FIFO="sqssnscerttest-q-fifo-${{env.UNIQUE_ID}}.fifo"
|
||||
echo "PUBSUB_AWS_SNSSQS_QUEUE_FIFO=$PUBSUB_AWS_SNSSQS_QUEUE_FIFO" >> $GITHUB_ENV
|
||||
PUBSUB_AWS_SNSSQS_TOPIC_FIFO="sqssnscerttest-t-fifo-${{env.UNIQUE_ID}}.fifo"
|
||||
|
|
|
@ -62,11 +62,11 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs
|
|||
- Subscriber 1 subscribes to 1 topic
|
||||
- Message are expected to arrive in order
|
||||
- Verify data with an optional parameters `sqsDeadLettersQueueName`, `messageRetryLimit`, and `messageReceiveLimit` takes affect (SNSSQSMessageDeadLetter)
|
||||
- Run dapr application with 1 publisher, 2 subscriber, and 2 topics
|
||||
- Run dapr application with 1 publisher, 2 subscriber, and 1 topics
|
||||
- Publishers publishes to 1 topic
|
||||
- Subscriber 1 subscribes to 1 topic and fails causing messages to go to deadletter queue
|
||||
- Subscriber 2 subscribes to 2 topic connected to deadletter queue
|
||||
- Message are expected to arrive from only from Subscriber 2
|
||||
- Subscriber 2 polls messages from the deadletter queue
|
||||
- Message are expected to only be successfully consumed by Subscriber 2 from deadletter queue
|
||||
### Running the tests
|
||||
|
||||
This must be run in the GitHub Actions Workflow configured for test infrastructure setup.
|
||||
|
|
|
@ -72,7 +72,6 @@ var (
|
|||
messageVisibilityTimeoutTopic = "messageVisibilityTimeoutTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_MVT
|
||||
fifoTopic = "fifoTopic" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_FIFO
|
||||
deadLetterTopicIn = "deadLetterTopicIn" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLIN
|
||||
deadLetterTopicOut = "deadLetterTopicOut" // replaced with env var PUBSUB_AWS_SNSSQS_TOPIC_DLOUT
|
||||
deadLetterQueueName = "deadLetterQueueName" // replaced with env var PUBSUB_AWS_SNSSQS_QUEUE_DLOUT
|
||||
)
|
||||
|
||||
|
@ -139,12 +138,6 @@ func init() {
|
|||
deadLetterTopicIn = qn
|
||||
topics = append(topics, deadLetterTopicIn)
|
||||
}
|
||||
qn = os.Getenv("PUBSUB_AWS_SNSSQS_TOPIC_DLOUT")
|
||||
if qn != "" {
|
||||
deadLetterTopicOut = qn
|
||||
topics = append(topics, deadLetterTopicOut)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAWSSNSSQSCertificationTests(t *testing.T) {
|
||||
|
@ -1400,7 +1393,7 @@ func SNSSQSMessageDeadLetter(t *testing.T) {
|
|||
}
|
||||
|
||||
var task flow.AsyncTask
|
||||
deadLetterReceiverApplication := func(topicOutName string, messagesWatcher *watcher.Watcher) flow.Runnable {
|
||||
deadLetterReceiverApplication := func(deadLetterQueueName string, messagesWatcher *watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
t := time.NewTicker(500 * time.Millisecond)
|
||||
defer t.Stop()
|
||||
|
@ -1410,12 +1403,12 @@ func SNSSQSMessageDeadLetter(t *testing.T) {
|
|||
for {
|
||||
select {
|
||||
case <-task.Done():
|
||||
ctx.Log("deadLetterReceiverApplication - task done!")
|
||||
ctx.Log("deadLetterReceiverApplication - task done called!")
|
||||
return nil
|
||||
case <-t.C:
|
||||
queueURL, err := getQueueURL(svc, topicOutName)
|
||||
queueURL, err := getQueueURL(svc, deadLetterQueueName)
|
||||
if err != nil {
|
||||
ctx.Logf("deadLetterReceiverApplication - failed get queue URL %q %v\n", topicOutName, err)
|
||||
ctx.Logf("deadLetterReceiverApplication - failed get queue URL %q %v\n", deadLetterQueueName, err)
|
||||
continue
|
||||
}
|
||||
ctx.Logf("deadLetterReceiverApplication - gettting messages (%q) counter(%d)\n", queueURL, counter)
|
||||
|
@ -1459,12 +1452,12 @@ func SNSSQSMessageDeadLetter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, topicOutName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
// prepare the messages
|
||||
messages := make([]string, failedMessagesNum)
|
||||
for i := range messages {
|
||||
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicOutName, i, uuid.New().String())
|
||||
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
||||
}
|
||||
|
||||
// add the messages as expectations to the watchers
|
||||
|
@ -1537,8 +1530,8 @@ func SNSSQSMessageDeadLetter(t *testing.T) {
|
|||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset+4),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("publish messages to deadLetterTopicIn ==> "+deadLetterTopicIn, publishMessages(nil, subAppSideCar, deadLetterTopicIn, deadLetterTopicOut, deadLetterConsumerGroup)).
|
||||
Step("wait", flow.Sleep(60*time.Second)).
|
||||
Step("publish messages to deadLetterTopicIn ==> "+deadLetterTopicIn, publishMessages(nil, subAppSideCar, deadLetterTopicIn, deadLetterConsumerGroup)).
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
Step("verify if app1 has 0 recevied messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
|
||||
Step("verify if app2 has deadletterMessageNum recevied messages send to dead letter queue", assertMessages(10*time.Second, deadLetterConsumerGroup)).
|
||||
Step("reset", flow.Reset(consumerGroup1, deadLetterConsumerGroup)).
|
||||
|
|
Loading…
Reference in New Issue