pub/sub rabbitmq: add publishing retries (#1382)
Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@gmail.com> Co-authored-by: Looong Dai <long.dai@intel.com>
This commit is contained in:
parent
29848e1ff6
commit
7a631ef7aa
|
@ -38,6 +38,8 @@ const (
|
|||
metadataMaxLenBytes = "maxLenBytes"
|
||||
|
||||
defaultReconnectWaitSeconds = 3
|
||||
publishMaxRetries = 3
|
||||
publishRetryWaitSeconds = 2
|
||||
metadataPrefetchCount = "prefetchCount"
|
||||
|
||||
argQueueMode = "x-queue-mode"
|
||||
|
@ -205,18 +207,26 @@ func (r *rabbitMQ) publishSync(req *pubsub.PublishRequest) (rabbitMQChannelBroke
|
|||
func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error {
|
||||
r.logger.Debugf("%s publishing message to %s", logMessagePrefix, req.Topic)
|
||||
|
||||
channel, connectionCount, err := r.publishSync(req)
|
||||
if err != nil {
|
||||
attempt := 0
|
||||
for {
|
||||
attempt++
|
||||
channel, connectionCount, err := r.publishSync(req)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if attempt >= publishMaxRetries {
|
||||
r.logger.Errorf("%s publishing failed: %v", logMessagePrefix, err)
|
||||
return err
|
||||
}
|
||||
if mustReconnect(channel, err) {
|
||||
r.logger.Warnf("%s publisher is reconnecting in %s ...", logMessagePrefix, r.metadata.reconnectWait.String())
|
||||
time.Sleep(r.metadata.reconnectWait)
|
||||
r.reconnect(connectionCount)
|
||||
} else {
|
||||
r.logger.Warnf("%s publishing attempt (%d/%d) failed: %v", logMessagePrefix, attempt, publishMaxRetries, err)
|
||||
time.Sleep(publishRetryWaitSeconds * time.Second)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
|
|
|
@ -184,8 +184,8 @@ func TestPublishReconnect(t *testing.T) {
|
|||
assert.Equal(t, 1, messageCount)
|
||||
assert.Equal(t, "hello world", lastMessage)
|
||||
// Check that reconnection happened
|
||||
assert.Equal(t, 2, broker.connectCount)
|
||||
assert.Equal(t, 2, broker.closeCount) // two counts - one for connection, one for channel
|
||||
assert.Equal(t, 3, broker.connectCount) // three counts - one initial connection plus 2 reconnect attempts
|
||||
assert.Equal(t, 4, broker.closeCount) // four counts - one for connection, one for channel , times 2 reconnect attempts
|
||||
|
||||
err = pubsubRabbitMQ.Publish(&pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")})
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -146,16 +146,8 @@ func TestRabbitMQ(t *testing.T) {
|
|||
for _, msg := range msgs {
|
||||
// randomize publishers
|
||||
indx := rand.Intn(len(sidecars))
|
||||
|
||||
log.Debugf("Sending: '%s' on topic '%s'", msg, topic)
|
||||
var err error
|
||||
for try := 0; try < 3; try++ {
|
||||
if err = sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg); err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed attempt to publish '%s' to topic '%s'", msg, topic)
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
err := sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg)
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
}(topic)
|
||||
|
|
Loading…
Reference in New Issue