add troubleshooting info (#1278)

add retries for pubsub subscribe
fix race conditions
This commit is contained in:
Dmitry Shmulevich 2021-11-15 16:33:21 -08:00 committed by GitHub
parent 3e3b64c0a2
commit ad433a27e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 297 additions and 338 deletions

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/streadway/amqp"
"github.com/dapr/components-contrib/pubsub"
@ -20,8 +21,8 @@ const (
fanoutExchangeKind = "fanout"
logMessagePrefix = "rabbitmq pub/sub:"
errorMessagePrefix = "rabbitmq pub/sub error:"
errorChannelNotInitialized = "channel not initialized"
errorChannelConnection = "channel/connection is not open"
errorUnexpectedCommand = "unexpected command received"
defaultDeadLetterExchangeFormat = "dlx-%s"
defaultDeadLetterQueueFormat = "dlq-%s"
@ -37,7 +38,7 @@ const (
metadataMaxLen = "maxLen"
metadataMaxLenBytes = "maxLenBytes"
defaultReconnectWaitSeconds = 10
defaultReconnectWaitSeconds = 3
metadataPrefetchCount = "prefetchCount"
argQueueMode = "x-queue-mode"
@ -134,13 +135,6 @@ func (r *rabbitMQ) Init(metadata pubsub.Metadata) error {
return nil
}
func (r *rabbitMQ) getChannel() (rabbitMQChannelBroker, int) {
r.channelMutex.RLock()
defer r.channelMutex.RUnlock()
return r.channel, r.connectionCount
}
func (r *rabbitMQ) reconnect(connectionCount int) error {
r.channelMutex.Lock()
defer r.channelMutex.Unlock()
@ -148,14 +142,18 @@ func (r *rabbitMQ) reconnect(connectionCount int) error {
return r.doReconnect(connectionCount)
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) doReconnect(connectionCount int) error {
if r.stopped {
// Do not reconnect on stopped service.
return errors.New("cannot connect after component is stopped")
}
r.logger.Infof("%s connectionCount: current=%d reference=%d", logMessagePrefix, r.connectionCount, connectionCount)
if connectionCount != r.connectionCount {
// Reconnection request is old.
r.logger.Infof("%s stale reconnect attempt", logMessagePrefix)
return nil
}
@ -173,47 +171,46 @@ func (r *rabbitMQ) doReconnect(connectionCount int) error {
r.connectionCount++
r.logger.Infof("%s connected", logMessagePrefix)
r.logger.Infof("%s connected with connectionCount=%d", logMessagePrefix, r.connectionCount)
return nil
}
func (r *rabbitMQ) getChannelOrReconnect() (rabbitMQChannelBroker, int, error) {
func (r *rabbitMQ) publishSync(req *pubsub.PublishRequest) (rabbitMQChannelBroker, int, error) {
r.channelMutex.Lock()
defer r.channelMutex.Unlock()
if r.channel != nil {
return r.channel, r.connectionCount, nil
if r.channel == nil {
return r.channel, r.connectionCount, errors.New(errorChannelNotInitialized)
}
r.logger.Warnf("%s reconnecting ...", logMessagePrefix)
err := r.doReconnect(r.connectionCount)
if err := r.ensureExchangeDeclared(r.channel, req.Topic); err != nil {
r.logger.Errorf("%s publishing to %s failed in ensureExchangeDeclared: %v", logMessagePrefix, req.Topic, err)
return r.channel, r.connectionCount, err
}
func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error {
channel, connectionCount, err := r.getChannelOrReconnect()
if err != nil {
return err
return r.channel, r.connectionCount, err
}
err = r.ensureExchangeDeclared(channel, req.Topic)
if err != nil {
return err
}
r.logger.Debugf("%s publishing message to topic '%s'", logMessagePrefix, req.Topic)
err = channel.Publish(req.Topic, "", false, false, amqp.Publishing{
if err := r.channel.Publish(req.Topic, "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: req.Data,
DeliveryMode: r.metadata.deliveryMode,
})
}); err != nil {
r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err)
return r.channel, r.connectionCount, err
}
return r.channel, r.connectionCount, nil
}
func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error {
r.logger.Debugf("%s publishing message to %s", logMessagePrefix, req.Topic)
channel, connectionCount, err := r.publishSync(req)
if err != nil {
if mustReconnect(channel, err) {
r.logger.Warnf("%s pubsub publisher for %s is reconnecting ...", logMessagePrefix, req.Topic)
r.logger.Warnf("%s publisher is reconnecting in %s ...", logMessagePrefix, r.metadata.reconnectWait.String())
time.Sleep(r.metadata.reconnectWait)
r.reconnect(connectionCount)
}
@ -229,9 +226,21 @@ func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler
}
queueName := fmt.Sprintf("%s-%s", r.metadata.consumerID, req.Topic)
r.logger.Infof("%s subscribe to topic/queue '%s/%s'", logMessagePrefix, req.Topic, queueName)
// // By the time Subscribe exits, the subscription should be active.
if _, _, _, err := r.ensureSubscription(req, queueName); err != nil {
// By the time Subscribe exits, the subscription should be active.
err := retry.NotifyRecover(func() error {
if _, _, _, err := r.ensureSubscription(req, queueName); err != nil {
r.logger.Warnf("failed attempt to subscribe to %s: %v", queueName, err)
return err
}
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(r.metadata.reconnectWait), 4), func(err error, d time.Duration) {
r.logger.Infof("failed to subscribe to %s. Retrying...", queueName)
}, func() {
r.logger.Infof("successfully subscribed to %s after initial error(s)", queueName)
})
if err != nil {
return err
}
@ -240,13 +249,16 @@ func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler
return nil
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub.SubscribeRequest, queueName string) (*amqp.Queue, error) {
err := r.ensureExchangeDeclared(channel, req.Topic)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in ensureExchangeDeclared: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
r.logger.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName)
r.logger.Infof("%s declaring queue '%s'", logMessagePrefix, queueName)
var args amqp.Table
if r.metadata.enableDeadLetter {
// declare dead letter exchange
@ -254,6 +266,8 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
dlqName := fmt.Sprintf(defaultDeadLetterQueueFormat, queueName)
err = r.ensureExchangeDeclared(channel, dlxName)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in ensureExchangeDeclared: %v", logMessagePrefix, req.Topic, dlqName, err)
return nil, err
}
var q amqp.Queue
@ -262,54 +276,66 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
dlqArgs[argQueueMode] = queueModeLazy
q, err = channel.QueueDeclare(dlqName, true, r.metadata.deleteWhenUnused, false, false, dlqArgs)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, dlqName, err)
return nil, err
}
err = channel.QueueBind(q.Name, "", dlxName, false, nil)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueBind: %v", logMessagePrefix, req.Topic, dlqName, err)
return nil, err
}
r.logger.Debugf("declared dead letter exchange for queue '%s' bind dead letter queue '%s' to dead letter exchange '%s'", queueName, dlqName, dlxName)
r.logger.Infof("%s declared dead letter exchange for queue '%s' bind dead letter queue '%s' to dead letter exchange '%s'", logMessagePrefix, queueName, dlqName, dlxName)
args = amqp.Table{argDeadLetterExchange: dlxName}
}
args = r.metadata.formatQueueDeclareArgs(args)
q, err := channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, false, false, args)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
if r.metadata.prefetchCount > 0 {
r.logger.Debugf("setting prefetch count to %s", strconv.Itoa(int(r.metadata.prefetchCount)))
r.logger.Infof("%s setting prefetch count to %s", logMessagePrefix, strconv.Itoa(int(r.metadata.prefetchCount)))
err = channel.Qos(int(r.metadata.prefetchCount), 0, false)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.Qos: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
}
r.logger.Debugf("%s binding queue '%s' to exchange '%s'", logMessagePrefix, q.Name, req.Topic)
r.logger.Infof("%s binding queue '%s' to exchange '%s'", logMessagePrefix, q.Name, req.Topic)
err = channel.QueueBind(q.Name, "", req.Topic, false, nil)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueBind: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
return &q, nil
}
func (r *rabbitMQ) ensureSubscription(req pubsub.SubscribeRequest,
queueName string) (rabbitMQChannelBroker, int, *amqp.Queue, error) {
channel, connectionCount := r.getChannel()
if channel == nil {
return nil, 0, nil, errors.New("channel not initialized")
func (r *rabbitMQ) ensureSubscription(req pubsub.SubscribeRequest, queueName string) (rabbitMQChannelBroker, int, *amqp.Queue, error) {
r.channelMutex.RLock()
defer r.channelMutex.RUnlock()
if r.channel == nil {
return nil, 0, nil, errors.New(errorChannelNotInitialized)
}
q, err := r.prepareSubscription(channel, req, queueName)
q, err := r.prepareSubscription(r.channel, req, queueName)
return channel, connectionCount, q, err
return r.channel, r.connectionCount, q, err
}
func (r *rabbitMQ) subscribeForever(req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler) {
for {
var (
err error
errFuncName string
connectionCount int
channel rabbitMQChannelBroker
q *amqp.Queue
@ -318,6 +344,7 @@ func (r *rabbitMQ) subscribeForever(req pubsub.SubscribeRequest, queueName strin
for {
channel, connectionCount, q, err = r.ensureSubscription(req, queueName)
if err != nil {
errFuncName = "ensureSubscription"
break
}
@ -331,24 +358,31 @@ func (r *rabbitMQ) subscribeForever(req pubsub.SubscribeRequest, queueName strin
nil,
)
if err != nil {
errFuncName = "channel.Consume"
break
}
err = r.listenMessages(channel, msgs, req.Topic, handler)
if err != nil {
errFuncName = "listenMessages"
break
}
}
if r.isStopped() {
r.logger.Infof("%s subscriber for %s is stopped", logMessagePrefix, queueName)
return
}
r.logger.Errorf("%s error in subscription for %s: %v", logMessagePrefix, queueName, err)
// print the error if the subscriber is running.
if err != nil {
r.logger.Errorf("%s error in subscriber for %s in %s: %v", logMessagePrefix, queueName, errFuncName, err)
}
if mustReconnect(channel, err) {
r.logger.Warnf("%s subscriber is reconnecting in %s ...", logMessagePrefix, r.metadata.reconnectWait.String())
time.Sleep(r.metadata.reconnectWait)
r.logger.Warnf("%s pubsub subscription for %s is reconnecting ...", logMessagePrefix, queueName)
r.reconnect(connectionCount)
}
}
@ -409,11 +443,14 @@ func (r *rabbitMQ) handleMessage(channel rabbitMQChannelBroker, d amqp.Delivery,
return err
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) ensureExchangeDeclared(channel rabbitMQChannelBroker, exchange string) error {
if !r.containsExchange(exchange) {
r.logger.Debugf("%s declaring exchange '%s' of kind '%s'", logMessagePrefix, exchange, fanoutExchangeKind)
err := channel.ExchangeDeclare(exchange, fanoutExchangeKind, true, false, false, false, nil)
if err != nil {
r.logger.Errorf("%s ensureExchangeDeclared: channel.ExchangeDeclare failed: %v", logMessagePrefix, err)
return err
}
@ -423,37 +460,38 @@ func (r *rabbitMQ) ensureExchangeDeclared(channel rabbitMQChannelBroker, exchang
return nil
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) containsExchange(exchange string) bool {
r.channelMutex.RLock()
defer r.channelMutex.RUnlock()
_, exists := r.declaredExchanges[exchange]
return exists
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) putExchange(exchange string) {
r.channelMutex.Lock()
defer r.channelMutex.Unlock()
r.declaredExchanges[exchange] = true
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) reset() (err error) {
if len(r.declaredExchanges) > 0 {
r.declaredExchanges = make(map[string]bool)
}
if r.channel != nil {
err = r.channel.Close()
if err = r.channel.Close(); err != nil {
r.logger.Errorf("%s reset: channel.Close() failed: %v", logMessagePrefix, err)
}
r.channel = nil
}
if r.connection != nil {
err2 := r.connection.Close()
r.connection = nil
if err == nil {
err = err2
if err2 := r.connection.Close(); err2 != nil {
r.logger.Errorf("%s reset: connection.Close() failed: %v", logMessagePrefix, err2)
if err == nil {
err = err2
}
}
r.connection = nil
}
return
@ -490,5 +528,5 @@ func mustReconnect(channel rabbitMQChannelBroker, err error) bool {
return false
}
return strings.Contains(err.Error(), errorChannelConnection) || strings.Contains(err.Error(), errorUnexpectedCommand)
return strings.Contains(err.Error(), errorChannelConnection)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
@ -14,7 +15,7 @@ import (
func newBroker() *rabbitMQInMemoryBroker {
return &rabbitMQInMemoryBroker{
buffer: make(chan amqp.Delivery),
buffer: make(chan amqp.Delivery, 2),
}
}
@ -288,9 +289,12 @@ func TestSubscribeReconnect(t *testing.T) {
assert.Equal(t, 2, messageCount)
assert.Equal(t, "foo bar", lastMessage)
// allow last reconnect completion
time.Sleep(time.Second)
// Check that reconnection happened
assert.Equal(t, 2, broker.connectCount)
assert.Equal(t, 2, broker.closeCount) // two counts - one for connection, one for channel
assert.Equal(t, 3, broker.connectCount) // initial connect + 2 reconnects
assert.Equal(t, 4, broker.closeCount) // two counts for each connection closure - one for connection, one for channel
}
func createAMQPMessage(body []byte) amqp.Delivery {

View File

@ -13,7 +13,7 @@ spec:
- name: durable
value: true
- name: deletedWhenUnused
value: true
value: false
- name: requeueInFailure
value: true
- name: backOffMaxRetries

View File

@ -13,7 +13,7 @@ spec:
- name: durable
value: true
- name: deletedWhenUnused
value: true
value: false
- name: requeueInFailure
value: true
- name: backOffMaxRetries

View File

@ -3,6 +3,7 @@ module github.com/dapr/components-contrib/tests/certification/pubsub/rabbitmq
go 1.17
require (
github.com/cenkalti/backoff/v4 v4.1.1
github.com/dapr/components-contrib v1.5.0
github.com/dapr/components-contrib/tests/certification v1.5.0
github.com/dapr/dapr v1.5.0
@ -26,7 +27,6 @@ require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fasthttp/router v1.3.8 // indirect
@ -73,6 +73,7 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/tylertreat/comcast v1.0.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.28.0 // indirect
go.opencensus.io v0.22.5 // indirect

View File

@ -1048,6 +1048,7 @@ github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlp
github.com/trusch/grpc-proxy v0.0.0-20190529073533-02b64529f274 h1:ChAMVBRng5Dsv0rnfOxUj7vg2M9D0rafbRY1N2EEAZ8=
github.com/trusch/grpc-proxy v0.0.0-20190529073533-02b64529f274/go.mod h1:dzrPb02OTNDVimdCCBR1WAPu9a69n3VnfDyCX/GT/gE=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/tylertreat/comcast v1.0.1 h1:+B8add2s9PrhX4lx5gGqOKUTebGD7lzdfwKZHYoF98Y=
github.com/tylertreat/comcast v1.0.1/go.mod h1:8mA9mMCnmAGjTnrWNKQ7PXsBy6FfguO+U9pSxifaka8=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=

View File

@ -8,10 +8,12 @@ package rabbitmq_test
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/streadway/amqp"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
@ -23,11 +25,13 @@ import (
"github.com/dapr/dapr/pkg/runtime"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/kit/logger"
kit_retry "github.com/dapr/kit/retry"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/retry"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
@ -38,9 +42,9 @@ const (
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
sidecarName3 = "dapr-3"
applicationID1 = "app-1"
applicationID2 = "app-2"
applicationID3 = "app-3"
appID1 = "app-1"
appID2 = "app-2"
appID3 = "app-3"
clusterName = "rabbitmqcertification"
dockerComposeYAML = "docker-compose.yml"
numMessages = 1000
@ -57,7 +61,7 @@ const (
topicGreen = "green"
)
type consumer struct {
type Consumer struct {
pubsub string
messages map[string]*watcher.Watcher
}
@ -80,70 +84,182 @@ func amqpReady(url string) flow.Runnable {
}
}
func TestSingleTopicSingleConsumer(t *testing.T) {
func TestRabbitMQ(t *testing.T) {
rand.Seed(time.Now().UTC().UnixNano())
log := logger.NewLogger("dapr.components")
// In RabbitMQ, messages might not come in order.
messages := watcher.NewUnordered()
//log.SetOutputLevel(logger.DebugLevel)
pubTopics := []string{topicRed, topicBlue, topicGreen}
subTopics := []string{topicRed, topicBlue}
alpha := &Consumer{pubsub: pubsubAlpha, messages: make(map[string]*watcher.Watcher)}
beta := &Consumer{pubsub: pubsubBeta, messages: make(map[string]*watcher.Watcher)}
consumers := []*Consumer{alpha, beta}
for _, consumer := range consumers {
for _, topic := range pubTopics {
// In RabbitMQ, messages might not come in order.
consumer.messages[topic] = watcher.NewUnordered()
}
}
// subscribed is used to synchronize between publisher and subscriber
subscribed := make(chan struct{}, 1)
// Test logic that sends messages to a topic and
// verifies the application has received them.
// Test logic that sends messages to topics and
// verifies the two consumers with different IDs have received them.
test := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]string, numMessages)
for i := range msgs {
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
}
messages.ExpectStrings(msgs...)
// Wait until we know Dapr has subscribed
// so we know published messages will be persisted/consumed.
<-subscribed
// Send events that the application above will observe.
ctx.Log("Sending messages!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
err := client.PublishEvent(
ctx, pubsubAlpha, topicRed, msg)
require.NoError(ctx, err, "error publishing message")
for _, consumer := range consumers {
for _, topic := range subTopics {
consumer.messages[topic].ExpectStrings(msgs...)
}
}
<-subscribed
// sidecar client array []{sidecar client, pubsub component name}
sidecars := []struct {
client *sidecar.Client
pubsub string
}{
{sidecar.GetClient(ctx, sidecarName1), pubsubAlpha},
{sidecar.GetClient(ctx, sidecarName2), pubsubBeta},
{sidecar.GetClient(ctx, sidecarName3), pubsubBeta},
}
var wg sync.WaitGroup
wg.Add(len(pubTopics))
for _, topic := range pubTopics {
go func(topic string) {
defer wg.Done()
// Send events that the application above will observe.
log.Infof("Sending messages on topic '%s'", topic)
for _, msg := range msgs {
// randomize publishers
indx := rand.Intn(len(sidecars))
log.Debugf("Sending: '%s' on topic '%s'", msg, topic)
var err error
for try := 0; try < 3; try++ {
if err = sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg); err == nil {
break
}
log.Errorf("failed attempt to publish '%s' to topic '%s'", msg, topic)
time.Sleep(5 * time.Second)
}
require.NoError(ctx, err, "error publishing message")
}
}(topic)
}
wg.Wait()
// Do the messages we observed match what we expect?
messages.Assert(ctx, time.Minute)
wg.Add(len(consumers) * len(pubTopics))
for _, topic := range pubTopics {
for _, consumer := range consumers {
go func(topic string) {
defer wg.Done()
consumer.messages[topic].Assert(ctx, 3*time.Minute)
}(topic)
}
}
wg.Wait()
return nil
}
// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, errFrequency)
application := func(consumer *Consumer, routeIndex int) app.SetupFn {
return func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, errFrequency)
// Setup topic event handler.
err = multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubAlpha,
Topic: topicRed,
Route: "/" + topicRed,
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
for _, topic := range subTopics {
// Setup the /orders event handler.
err = multierr.Combine(
err,
s.AddTopicEventHandler(&common.Subscription{
PubsubName: consumer.pubsub,
Topic: topic,
Route: fmt.Sprintf("/%s-%d", topic, routeIndex),
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
consumer.messages[e.Topic].Observe(e.Data)
log.Debugf("Event - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
return err
}
}
// sendMessagesInBackground and assertMessages are
// Runnables for testing publishing and consuming
// messages reliably when infrastructure and network
// interruptions occur.
var task flow.AsyncTask
sendMessagesInBackground := func(consumer *Consumer) flow.Runnable {
return func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
for _, topic := range subTopics {
consumer.messages[topic].Reset()
}
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
counter := 1
for {
select {
case <-task.Done():
return nil
case <-t.C:
for _, topic := range subTopics {
msg := fmt.Sprintf("Background message - %03d", counter)
consumer.messages[topic].Prepare(msg) // Track for observation
// Publish with retries.
bo := backoff.WithContext(backoff.NewConstantBackOff(time.Second), task)
if err := kit_retry.NotifyRecover(func() error {
return client.PublishEvent(
// Using ctx instead of task here is deliberate.
// We don't want cancelation to prevent adding
// the message, only to interrupt between tries.
ctx, consumer.pubsub, topic, msg)
}, bo, func(err error, t time.Duration) {
ctx.Logf("Error publishing message, retrying in %s", t)
}, func() {}); err == nil {
consumer.messages[topic].Add(msg) // Success
counter++
}
}
}
}
}
}
assertMessages := func(consumer *Consumer) flow.Runnable {
return func(ctx flow.Context) error {
// Signal sendMessagesInBackground to stop and wait for it to complete.
task.CancelAndWait()
for _, topic := range subTopics {
consumer.messages[topic].Assert(ctx, 5*time.Minute)
}
// Track/Observe the data of the event.
messages.Observe(e.Data)
ctx.Logf("Event - pubsub: %s, topic: %s, id: %s, data: %s",
e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}))
return err
return nil
}
}
flow.New(t, "rabbitmq certification").
@ -151,268 +267,67 @@ func TestSingleTopicSingleConsumer(t *testing.T) {
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
// Run the application logic above.
Step(app.Run(applicationID1, fmt.Sprintf(":%d", appPort), application)).
// Run the application1 logic above.
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
application(alpha, 1))).
// Run the Dapr sidecar with the RabbitMQ component.
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/alpha"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
embedded.WithProfilePort(runtime.DefaultProfilePort),
runtime.WithPubSubs(
pubsub_loader.New("rabbitmq", func() pubsub.PubSub {
return pubsub_rabbitmq.NewRabbitMQ(log)
}),
))).
Step("signal subscribed", flow.MustDo(func() {
close(subscribed)
})).
Step("send and wait", test).
Run()
}
func TestMultiTopicSingleConsumer(t *testing.T) {
log := logger.NewLogger("dapr.components")
topics := []string{topicRed, topicBlue, topicGreen}
messages := make(map[string]*watcher.Watcher)
for _, topic := range topics {
// In RabbitMQ, messages might not come in order.
messages[topic] = watcher.NewUnordered()
}
// subscribed is used to synchronize between publisher and subscriber
subscribed := make(chan struct{}, 1)
// Test logic that sends messages to a topic and
// verifies the application has received them.
test := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName2)
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]string, numMessages)
for i := range msgs {
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
}
<-subscribed
// expecting no messages in topicGreen
messages[topicGreen].ExpectStrings()
var wg sync.WaitGroup
wg.Add(2)
for _, topic := range []string{topicRed, topicBlue} {
go func(topic string) {
defer wg.Done()
messages[topic].ExpectStrings(msgs...)
// Send events that the application above will observe.
ctx.Log("Sending messages!")
for _, msg := range msgs {
ctx.Logf("Sending: %q to topic %q", msg, topic)
err := client.PublishEvent(ctx, pubsubAlpha, topic, msg)
require.NoError(ctx, err, "error publishing message")
}
// Do the messages we observed match what we expect?
messages[topic].Assert(ctx, time.Minute)
}(topic)
}
wg.Wait()
messages[topicGreen].Assert(ctx, time.Second)
return nil
}
// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
for _, topic := range topics {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, errFrequency)
// Setup topic event handler.
err = multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubAlpha,
Topic: topic,
Route: "/" + topic,
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messages[e.Topic].Observe(e.Data)
ctx.Logf("Event - pubsub: %s, topic: %s, id: %s, data: %s",
e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}))
}
return err
}
flow.New(t, "rabbitmq certification").
// Run RabbitMQ using Docker Compose.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
// Run the application logic above.
Step(app.Run(applicationID2, fmt.Sprintf(":%d", appPort+2), application)).
// Run the application2 logic above.
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2),
application(beta, 2))).
// Run the Dapr sidecar with the RabbitMQ component.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/beta"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+2),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+2),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+2),
embedded.WithProfilePort(runtime.DefaultProfilePort+2),
runtime.WithPubSubs(
pubsub_loader.New("rabbitmq", func() pubsub.PubSub {
return pubsub_rabbitmq.NewRabbitMQ(log)
}),
))).
Step("signal subscribed", flow.MustDo(func() {
close(subscribed)
})).
Step("send and wait", test).
Run()
}
func TestMultiTopicMuliConsumer(t *testing.T) {
log := logger.NewLogger("dapr.components")
topics := []string{topicRed, topicBlue, topicGreen}
alpha := &consumer{pubsub: pubsubAlpha, messages: make(map[string]*watcher.Watcher)}
beta := &consumer{pubsub: pubsubBeta, messages: make(map[string]*watcher.Watcher)}
for _, topic := range topics {
// In RabbitMQ, messages might not come in order.
alpha.messages[topic] = watcher.NewUnordered()
beta.messages[topic] = watcher.NewUnordered()
}
// subscribed is used to synchronize between publisher and subscriber
subscribed := make(chan struct{}, 1)
// Test logic that sends messages to a topic and
// verifies the application has received them.
test := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName3)
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]string, numMessages)
for i := range msgs {
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
}
<-subscribed
// expecting no messages in topicGrey
alpha.messages[topicGreen].ExpectStrings()
beta.messages[topicGreen].ExpectStrings()
var wg sync.WaitGroup
wg.Add(2)
for _, topic := range []string{topicRed, topicBlue} {
go func(topic string) {
defer wg.Done()
alpha.messages[topic].ExpectStrings(msgs...)
beta.messages[topic].ExpectStrings(msgs...)
// Send events that the application above will observe.
ctx.Log("Sending messages!")
for i, msg := range msgs {
// alternate publisher
pubsub := pubsubAlpha
if i%3 == 0 {
pubsub = pubsubBeta
}
ctx.Logf("Sending: %q to topic %q", msg, topic)
err := client.PublishEvent(ctx, pubsub, topic, msg)
require.NoError(ctx, err, "error publishing message")
}
// Do the messages we observed match what we expect?
alpha.messages[topic].Assert(ctx, time.Minute)
beta.messages[topic].Assert(ctx, time.Second)
}(topic)
}
wg.Wait()
alpha.messages[topicGreen].Assert(ctx, time.Second)
beta.messages[topicGreen].Assert(ctx, time.Second)
return nil
}
// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
for _, topic := range topics {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, errFrequency)
// Setup topic event handler.
err = multierr.Combine(
s.AddTopicEventHandler(
&common.Subscription{
PubsubName: alpha.pubsub,
Topic: topic,
Route: fmt.Sprintf("/%s-alpha", topic),
},
eventHandler(ctx, alpha, topic, sim),
),
s.AddTopicEventHandler(
&common.Subscription{
PubsubName: beta.pubsub,
Topic: topic,
Route: fmt.Sprintf("/%s-beta1", topic),
},
eventHandler(ctx, beta, topic, sim),
),
s.AddTopicEventHandler(
&common.Subscription{
PubsubName: beta.pubsub,
Topic: topic,
Route: fmt.Sprintf("/%s-beta2", topic),
},
eventHandler(ctx, beta, topic, sim),
),
)
}
return err
}
flow.New(t, "rabbitmq certification").
// Run RabbitMQ using Docker Compose.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
// Run the application logic above.
Step(app.Run(applicationID3, fmt.Sprintf(":%d", appPort+4), application)).
// Run the application3 logic above.
Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4),
application(beta, 3))).
// Run the Dapr sidecar with the RabbitMQ component.
Step(sidecar.Run(sidecarName3,
embedded.WithComponentsPath("./components/beta"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+4),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+4),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+4),
embedded.WithProfilePort(runtime.DefaultProfilePort+4),
runtime.WithPubSubs(
pubsub_loader.New("rabbitmq", func() pubsub.PubSub {
return pubsub_rabbitmq.NewRabbitMQ(log)
}),
))).
Step("wait", flow.Sleep(5*time.Second)).
Step("signal subscribed", flow.MustDo(func() {
close(subscribed)
})).
Step("send and wait", test).
// Simulate a network interruption.
// This tests the components ability to handle reconnections
// when Dapr is disconnected abnormally.
StepAsync("steady flow of messages to publish", &task, sendMessagesInBackground(alpha)).
Step("wait", flow.Sleep(5*time.Second)).
//
// Errors will occurring here.
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "5672")).
//
// Component should recover at this point.
Step("wait", flow.Sleep(30*time.Second)).
Step("assert messages", assertMessages(alpha)).
Run()
}
func eventHandler(ctx flow.Context, cons *consumer, topic string, sim func() error) func(context.Context, *common.TopicEvent) (bool, error) {
return func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
cons.messages[topic].Observe(e.Data)
ctx.Logf("Event - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s",
cons.pubsub, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}
}