Add backoff for rabbitmq PubSub component (#862)

* Add backoff for rabbitmq

* fix lint

* fix lint

* remove backOffEnable config

* Delete backoff enable metadata

* Add a default config setting max retry to 0

* fix lint

Co-authored-by: Phil Kedy <phil.kedy@gmail.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
This commit is contained in:
Taction 2021-06-10 15:19:46 +08:00 committed by GitHub
parent a59dcc44eb
commit 14870cb7bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 4 deletions

View File

@ -55,6 +55,15 @@ func DefaultConfig() Config {
}
}
// DefaultConfigWithNoRetry represents the default configuration with `MaxRetries` set to 0.
// This may be useful for those brokers which can handles retries on its own.
func DefaultConfigWithNoRetry() Config {
c := DefaultConfig()
c.MaxRetries = 0
return c
}
// DecodeConfig decodes a Go struct into a `Config`.
func DecodeConfig(c *Config, input interface{}) error {
// Use the deefault config if `c` is empty/zero value.

View File

@ -215,4 +215,22 @@ func TestCreateMetadata(t *testing.T) {
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
})
}
for _, tt := range booleanFlagTests {
t.Run(fmt.Sprintf("backOffEnable value=%s", tt.in), func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
// act
m, err := createMetadata(fakeMetaData)
// assert
assert.NoError(t, err)
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
})
}
}

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/dapr/components-contrib/internal/retry"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/streadway/amqp"
@ -44,7 +45,10 @@ type rabbitMQ struct {
connectionDial func(host string) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error)
logger logger.Logger
logger logger.Logger
backOffConfig retry.Config
ctx context.Context
cancel context.CancelFunc
}
// interface used to allow unit testing
@ -95,6 +99,17 @@ func (r *rabbitMQ) Init(metadata pubsub.Metadata) error {
return err
}
// Default retry configuration is used if no backOff properties are set.
// backOff max retry config is set to 0, which means not to retry by default.
r.backOffConfig = retry.DefaultConfigWithNoRetry()
if err := retry.DecodeConfigWithPrefix(
&r.backOffConfig,
metadata.Properties,
"backOff"); err != nil {
return err
}
r.ctx, r.cancel = context.WithCancel(context.Background())
r.metadata = meta
r.reconnect(0)
// We do not return error on reconnect because it can cause problems if init() happens
@ -318,10 +333,14 @@ func (r *rabbitMQ) handleMessage(channel rabbitMQChannelBroker, d amqp.Delivery,
Topic: topic,
}
err := handler(context.Background(), pubsubMsg)
if err != nil {
b := r.backOffConfig.NewBackOffWithContext(r.ctx)
err := retry.NotifyRecover(func() error {
return handler(r.ctx, pubsubMsg)
}, b, func(err error, d time.Duration) {
r.logger.Errorf("%s error handling message from topic '%s', %s", logMessagePrefix, topic, err)
}
}, func() {
r.logger.Infof("%s successfully processed message after it previously failed from topic '%s'", logMessagePrefix, topic)
})
//nolint:nestif
// if message is not auto acked we need to ack/nack
@ -402,6 +421,7 @@ func (r *rabbitMQ) Close() error {
err := r.reset()
r.stopped = true
r.cancel()
return err
}