From 7a631ef7aa960407b06093e8ab391261a194265c Mon Sep 17 00:00:00 2001 From: Dmitry Shmulevich Date: Fri, 17 Dec 2021 15:59:38 -0800 Subject: [PATCH] pub/sub rabbitmq: add publishing retries (#1382) Signed-off-by: Dmitry Shmulevich Co-authored-by: Looong Dai --- pubsub/rabbitmq/rabbitmq.go | 22 ++++++++++++++----- pubsub/rabbitmq/rabbitmq_test.go | 4 ++-- .../pubsub/rabbitmq/rabbitmq_test.go | 10 +-------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index 6811cddce..fda8cd90f 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -38,6 +38,8 @@ const ( metadataMaxLenBytes = "maxLenBytes" defaultReconnectWaitSeconds = 3 + publishMaxRetries = 3 + publishRetryWaitSeconds = 2 metadataPrefetchCount = "prefetchCount" argQueueMode = "x-queue-mode" @@ -205,18 +207,26 @@ func (r *rabbitMQ) publishSync(req *pubsub.PublishRequest) (rabbitMQChannelBroke func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error { r.logger.Debugf("%s publishing message to %s", logMessagePrefix, req.Topic) - channel, connectionCount, err := r.publishSync(req) - if err != nil { + attempt := 0 + for { + attempt++ + channel, connectionCount, err := r.publishSync(req) + if err == nil { + return nil + } + if attempt >= publishMaxRetries { + r.logger.Errorf("%s publishing failed: %v", logMessagePrefix, err) + return err + } if mustReconnect(channel, err) { r.logger.Warnf("%s publisher is reconnecting in %s ...", logMessagePrefix, r.metadata.reconnectWait.String()) time.Sleep(r.metadata.reconnectWait) r.reconnect(connectionCount) + } else { + r.logger.Warnf("%s publishing attempt (%d/%d) failed: %v", logMessagePrefix, attempt, publishMaxRetries, err) + time.Sleep(publishRetryWaitSeconds * time.Second) } - - return err } - - return nil } func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { diff --git a/pubsub/rabbitmq/rabbitmq_test.go b/pubsub/rabbitmq/rabbitmq_test.go index e670b46b3..cce536c82 100644 --- a/pubsub/rabbitmq/rabbitmq_test.go +++ b/pubsub/rabbitmq/rabbitmq_test.go @@ -184,8 +184,8 @@ func TestPublishReconnect(t *testing.T) { assert.Equal(t, 1, messageCount) assert.Equal(t, "hello world", lastMessage) // Check that reconnection happened - assert.Equal(t, 2, broker.connectCount) - assert.Equal(t, 2, broker.closeCount) // two counts - one for connection, one for channel + assert.Equal(t, 3, broker.connectCount) // three counts - one initial connection plus 2 reconnect attempts + assert.Equal(t, 4, broker.closeCount) // four counts - one for connection, one for channel , times 2 reconnect attempts err = pubsubRabbitMQ.Publish(&pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")}) assert.Nil(t, err) diff --git a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go index daf3773e5..0e6b3bd98 100644 --- a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go +++ b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go @@ -146,16 +146,8 @@ func TestRabbitMQ(t *testing.T) { for _, msg := range msgs { // randomize publishers indx := rand.Intn(len(sidecars)) - log.Debugf("Sending: '%s' on topic '%s'", msg, topic) - var err error - for try := 0; try < 3; try++ { - if err = sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg); err == nil { - break - } - log.Errorf("failed attempt to publish '%s' to topic '%s'", msg, topic) - time.Sleep(5 * time.Second) - } + err := sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg) require.NoError(ctx, err, "error publishing message") } }(topic)