From 1b30e75e6083ce7fa9c323e6dfae54768302db2e Mon Sep 17 00:00:00 2001 From: Phil Kedy Date: Thu, 6 May 2021 12:43:57 -0400 Subject: [PATCH] Configurable retry (#854) * no message * Switching over usages of retry to the new package and making use of the DecodeConfig function * Added decoding configuration using settings with a specific prefix * Linter fixes * Fixing linter error * time.Sleep is the enemy * Fix typo in comment * Moving config to a pointer parameter so that the component can pass in the config with default values that make sense for that component. * Renamed config struct * Fix comment Co-authored-by: Artur Souza --- bindings/mqtt/mqtt.go | 9 +- internal/config/decode.go | 8 +- .../config/normalize.go | 13 +- .../config/normalize_test.go | 9 +- internal/config/prefix.go | 53 ++++ internal/config/prefix_test.go | 58 +++++ internal/retry/retry.go | 158 ++++++++++++ internal/retry/retry_test.go | 228 ++++++++++++++++++ nameresolution/consul/configuration.go | 12 +- nameresolution/kubernetes/kubernetes.go | 3 +- pubsub/azure/servicebus/servicebus.go | 3 +- pubsub/hazelcast/hazelcast.go | 3 +- pubsub/kafka/kafka.go | 34 +-- pubsub/mqtt/mqtt.go | 3 +- pubsub/natsstreaming/natsstreaming.go | 32 ++- pubsub/pulsar/pulsar.go | 27 ++- pubsub/retry.go | 31 --- 17 files changed, 591 insertions(+), 93 deletions(-) rename nameresolution/configuration.go => internal/config/normalize.go (71%) rename nameresolution/configuration_test.go => internal/config/normalize_test.go (92%) create mode 100644 internal/config/prefix.go create mode 100644 internal/config/prefix_test.go create mode 100644 internal/retry/retry.go create mode 100644 internal/retry/retry_test.go delete mode 100644 pubsub/retry.go diff --git a/bindings/mqtt/mqtt.go b/bindings/mqtt/mqtt.go index ec9d17b6d..495dd3680 100644 --- a/bindings/mqtt/mqtt.go +++ b/bindings/mqtt/mqtt.go @@ -19,10 +19,11 @@ import ( "time" "github.com/cenkalti/backoff/v4" - "github.com/dapr/components-contrib/bindings" - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" mqtt "github.com/eclipse/paho.mqtt.golang" + + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/internal/retry" + "github.com/dapr/kit/logger" ) const ( @@ -222,7 +223,7 @@ func (m *MQTT) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error if m.metadata.backOffMaxRetries >= 0 { b = backoff.WithMaxRetries(m.backOff, uint64(m.metadata.backOffMaxRetries)) } - if err := pubsub.RetryNotifyRecover(func() error { + if err := retry.NotifyRecover(func() error { m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID()) if _, err := handler(&msg); err != nil { return err diff --git a/internal/config/decode.go b/internal/config/decode.go index 2b628d677..ab3972806 100644 --- a/internal/config/decode.go +++ b/internal/config/decode.go @@ -33,7 +33,7 @@ type StringDecoder interface { // Most of the heavy lifting is handled by the mapstructure library. A custom decoder is used to handle // decoding string values to the supported primitives. func Decode(input interface{}, output interface{}) error { - decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ // nolint:exhaustivestruct Result: output, DecodeHook: decodeString, }) @@ -44,6 +44,7 @@ func Decode(input interface{}, output interface{}) error { return decoder.Decode(input) } +// nolint:cyclop func decodeString( f reflect.Type, t reflect.Type, @@ -59,7 +60,10 @@ func decodeString( return data, nil } - dataString := data.(string) + dataString, ok := data.(string) + if !ok { + return nil, errors.Errorf("expected string: got %s", reflect.TypeOf(data)) + } var result interface{} var decoder StringDecoder diff --git a/nameresolution/configuration.go b/internal/config/normalize.go similarity index 71% rename from nameresolution/configuration.go rename to internal/config/normalize.go index 2680fe1d7..e72830350 100644 --- a/nameresolution/configuration.go +++ b/internal/config/normalize.go @@ -3,22 +3,23 @@ // Licensed under the MIT License. // ------------------------------------------------------------ -package nameresolution +package config import ( "fmt" ) -// ConvertConfig converts map[interface{}]interface{} to map[string]interface{} to normalize +// Normalize converts map[interface{}]interface{} to map[string]interface{} to normalize // for JSON and usage in component initialization. -func ConvertConfig(i interface{}) (interface{}, error) { +// nolint:cyclop +func Normalize(i interface{}) (interface{}, error) { var err error switch x := i.(type) { case map[interface{}]interface{}: m2 := map[string]interface{}{} for k, v := range x { if strKey, ok := k.(string); ok { - if m2[strKey], err = ConvertConfig(v); err != nil { + if m2[strKey], err = Normalize(v); err != nil { return nil, err } } else { @@ -30,7 +31,7 @@ func ConvertConfig(i interface{}) (interface{}, error) { case map[string]interface{}: m2 := map[string]interface{}{} for k, v := range x { - if m2[k], err = ConvertConfig(v); err != nil { + if m2[k], err = Normalize(v); err != nil { return nil, err } } @@ -38,7 +39,7 @@ func ConvertConfig(i interface{}) (interface{}, error) { return m2, nil case []interface{}: for i, v := range x { - if x[i], err = ConvertConfig(v); err != nil { + if x[i], err = Normalize(v); err != nil { return nil, err } } diff --git a/nameresolution/configuration_test.go b/internal/config/normalize_test.go similarity index 92% rename from nameresolution/configuration_test.go rename to internal/config/normalize_test.go index aa2935df5..64e2124dc 100644 --- a/nameresolution/configuration_test.go +++ b/internal/config/normalize_test.go @@ -3,17 +3,18 @@ // Licensed under the MIT License. // ------------------------------------------------------------ -package nameresolution_test +package config_test import ( "testing" - "github.com/dapr/components-contrib/nameresolution" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/internal/config" ) -func TestConfiguration(t *testing.T) { +func TestNormalize(t *testing.T) { tests := map[string]struct { input interface{} expected interface{} @@ -84,7 +85,7 @@ func TestConfiguration(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - actual, err := nameresolution.ConvertConfig(tc.input) + actual, err := config.Normalize(tc.input) if tc.err != "" { require.Error(t, err) assert.EqualError(t, err, tc.err) diff --git a/internal/config/prefix.go b/internal/config/prefix.go new file mode 100644 index 000000000..f220ef340 --- /dev/null +++ b/internal/config/prefix.go @@ -0,0 +1,53 @@ +package config + +import ( + "strings" + "unicode" +) + +func PrefixedBy(input interface{}, prefix string) (interface{}, error) { + normalized, err := Normalize(input) + if err != nil { + // The only error that can come from normalize is if + // input is a map[interface{}]interface{} and contains + // a key that is not a string. + return input, err + } + input = normalized + + if inputMap, ok := input.(map[string]interface{}); ok { + converted := make(map[string]interface{}, len(inputMap)) + for k, v := range inputMap { + if strings.HasPrefix(k, prefix) { + key := uncapitalize(strings.TrimPrefix(k, prefix)) + converted[key] = v + } + } + + return converted, nil + } else if inputMap, ok := input.(map[string]string); ok { + converted := make(map[string]string, len(inputMap)) + for k, v := range inputMap { + if strings.HasPrefix(k, prefix) { + key := uncapitalize(strings.TrimPrefix(k, prefix)) + converted[key] = v + } + } + + return converted, nil + } + + return input, nil +} + +// uncapitalize initial capital letters in `str`. +func uncapitalize(str string) string { + if len(str) == 0 { + return str + } + + vv := []rune(str) // Introduced later + vv[0] = unicode.ToLower(vv[0]) + + return string(vv) +} diff --git a/internal/config/prefix_test.go b/internal/config/prefix_test.go new file mode 100644 index 000000000..2d22ea5bb --- /dev/null +++ b/internal/config/prefix_test.go @@ -0,0 +1,58 @@ +package config_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/internal/config" +) + +func TestPrefixedBy(t *testing.T) { + tests := map[string]struct { + prefix string + input interface{} + expected interface{} + err string + }{ + "map of string to string": { + prefix: "test", + input: map[string]string{ + "": "", + "ignore": "don't include me", + "testOne": "include me", + "testTwo": "and me", + }, + expected: map[string]string{ + "one": "include me", + "two": "and me", + }, + }, + "map of string to interface{}": { + prefix: "test", + input: map[string]interface{}{ + "": "", + "ignore": "don't include me", + "testOne": "include me", + "testTwo": "and me", + }, + expected: map[string]interface{}{ + "one": "include me", + "two": "and me", + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + actual, err := config.PrefixedBy(tc.input, tc.prefix) + if tc.err != "" { + if assert.Error(t, err) { + assert.Equal(t, tc.err, err.Error()) + } + } else { + assert.Equal(t, tc.expected, actual, "unexpected output") + } + }) + } +} diff --git a/internal/retry/retry.go b/internal/retry/retry.go new file mode 100644 index 000000000..6e841a97a --- /dev/null +++ b/internal/retry/retry.go @@ -0,0 +1,158 @@ +package retry + +import ( + "context" + "strings" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/pkg/errors" + + "github.com/dapr/components-contrib/internal/config" +) + +// PolicyType denotes if the back off delay should be constant or exponential. +type PolicyType int + +const ( + // PolicyConstant is a backoff policy that always returns the same backoff delay. + PolicyConstant PolicyType = iota + // PolicyExponential is a backoff implementation that increases the backoff period + // for each retry attempt using a randomization function that grows exponentially. + PolicyExponential +) + +// Config encapsulates the back off policy configuration. +type Config struct { + Policy PolicyType `mapstructure:"policy"` + + // Constant back off + Duration time.Duration `mapstructure:"duration"` + + // Exponential back off + InitialInterval time.Duration `mapstructure:"initialInterval"` + RandomizationFactor float32 `mapstructure:"randomizationFactor"` + Multiplier float32 `mapstructure:"multiplier"` + MaxInterval time.Duration `mapstructure:"maxInterval"` + MaxElapsedTime time.Duration `mapstructure:"maxElapsedTime"` + + // Additional options + MaxRetries int64 `mapstructure:"maxRetries"` +} + +// DefaultConfig represents the default configuration for a +// `Config`. +func DefaultConfig() Config { + return Config{ + Policy: PolicyConstant, + Duration: 5 * time.Second, + InitialInterval: backoff.DefaultInitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: backoff.DefaultMaxInterval, + MaxElapsedTime: backoff.DefaultMaxElapsedTime, + MaxRetries: -1, + } +} + +// DecodeConfig decodes a Go struct into a `Config`. +func DecodeConfig(c *Config, input interface{}) error { + // Use the deefault config if `c` is empty/zero value. + var emptyConfig Config + if *c == emptyConfig { + *c = DefaultConfig() + } + + return config.Decode(input, c) +} + +// DecodeConfigWithPrefix decodes a Go struct into a `Config`. +func DecodeConfigWithPrefix(c *Config, input interface{}, prefix string) error { + input, err := config.PrefixedBy(input, prefix) + if err != nil { + return err + } + + return DecodeConfig(c, input) +} + +// NewBackOff returns a BackOff instance for use with `NotifyRecover` +// or `backoff.RetryNotify` directly. The instance will not stop due to +// context cancellation. To support cancellation (recommended), use +// `NewBackOffWithContext`. +// +// Since the underlying backoff implementations are not always thread safe, +// `NewBackOff` or `NewBackOffWithContext` should be called each time +// `RetryNotifyRecover` or `backoff.RetryNotify` is used. +func (c *Config) NewBackOff() backoff.BackOff { + var b backoff.BackOff + switch c.Policy { + case PolicyConstant: + b = backoff.NewConstantBackOff(c.Duration) + case PolicyExponential: + eb := backoff.NewExponentialBackOff() + eb.InitialInterval = c.InitialInterval + eb.RandomizationFactor = float64(c.RandomizationFactor) + eb.Multiplier = float64(c.Multiplier) + eb.MaxInterval = c.MaxInterval + eb.MaxElapsedTime = c.MaxElapsedTime + b = eb + } + + if c.MaxRetries >= 0 { + b = backoff.WithMaxRetries(b, uint64(c.MaxRetries)) + } + + return b +} + +// NewBackOffWithContext returns a BackOff instance for use with `RetryNotifyRecover` +// or `backoff.RetryNotify` directly. The provided context is used to cancel retries +// if it is canceled. +// +// Since the underlying backoff implementations are not always thread safe, +// `NewBackOff` or `NewBackOffWithContext` should be called each time +// `RetryNotifyRecover` or `backoff.RetryNotify` is used. +func (c *Config) NewBackOffWithContext(ctx context.Context) backoff.BackOff { + b := c.NewBackOff() + + return backoff.WithContext(b, ctx) +} + +// NotifyRecover is a wrapper around backoff.RetryNotify that adds another callback for when an operation +// previously failed but has since recovered. The main purpose of this wrapper is to call `notify` only when +// the operations fails the first time and `recovered` when it finally succeeds. This can be helpful in limiting +// log messages to only the events that operators need to be alerted on. +func NotifyRecover(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, recovered func()) error { + var notified bool + + return backoff.RetryNotify(func() error { + err := operation() + + if err == nil && notified { + notified = false + recovered() + } + + return err + }, b, func(err error, d time.Duration) { + if !notified { + notify(err, d) + notified = true + } + }) +} + +// DecodeString handles converting a string value to `p`. +func (p *PolicyType) DecodeString(value string) error { + switch strings.ToLower(value) { + case "constant": + *p = PolicyConstant + case "exponential": + *p = PolicyExponential + default: + return errors.Errorf("unexpected back off policy type: %s", value) + } + + return nil +} diff --git a/internal/retry/retry_test.go b/internal/retry/retry_test.go new file mode 100644 index 000000000..a165f4b61 --- /dev/null +++ b/internal/retry/retry_test.go @@ -0,0 +1,228 @@ +package retry_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/internal/retry" +) + +var errRetry = errors.New("Testing") + +func TestDecode(t *testing.T) { + tests := map[string]struct { + config interface{} + overrides func(config *retry.Config) + err string + }{ + "invalid policy type": { + config: map[string]interface{}{ + "backOffPolicy": "invalid", + }, + overrides: nil, + err: "1 error(s) decoding:\n\n* error decoding 'policy': invalid PolicyType \"invalid\": unexpected back off policy type: invalid", + }, + "default": { + config: map[string]interface{}{}, + overrides: nil, + err: "", + }, + "constant default": { + config: map[string]interface{}{ + "backOffPolicy": "constant", + }, + overrides: nil, + err: "", + }, + "constant with duraction": { + config: map[string]interface{}{ + "backOffPolicy": "constant", + "backOffDuration": "10s", + }, + overrides: func(config *retry.Config) { + config.Duration = 10 * time.Second + }, + err: "", + }, + "exponential default": { + config: map[string]interface{}{ + "backOffPolicy": "exponential", + }, + overrides: func(config *retry.Config) { + config.Policy = retry.PolicyExponential + }, + err: "", + }, + "exponential with string settings": { + config: map[string]interface{}{ + "backOffPolicy": "exponential", + "backOffInitialInterval": "1000", // 1s + "backOffRandomizationFactor": "1.0", + "backOffMultiplier": "2.0", + "backOffMaxInterval": "120000", // 2m + "backOffMaxElapsedTime": "1800000", // 30m + }, + overrides: func(config *retry.Config) { + config.Policy = retry.PolicyExponential + config.InitialInterval = 1 * time.Second + config.RandomizationFactor = 1.0 + config.Multiplier = 2.0 + config.MaxInterval = 2 * time.Minute + config.MaxElapsedTime = 30 * time.Minute + }, + err: "", + }, + "exponential with typed settings": { + config: map[string]interface{}{ + "backOffPolicy": "exponential", + "backOffInitialInterval": "1000ms", // 1s + "backOffRandomizationFactor": 1.0, + "backOffMultiplier": 2.0, + "backOffMaxInterval": "120s", // 2m + "backOffMaxElapsedTime": "30m", // 30m + }, + overrides: func(config *retry.Config) { + config.Policy = retry.PolicyExponential + config.InitialInterval = 1 * time.Second + config.RandomizationFactor = 1.0 + config.Multiplier = 2.0 + config.MaxInterval = 2 * time.Minute + config.MaxElapsedTime = 30 * time.Minute + }, + err: "", + }, + "map[string]string settings": { + config: map[string]string{ + "backOffPolicy": "exponential", + "backOffInitialInterval": "1000ms", // 1s + "backOffRandomizationFactor": "1.0", + "backOffMultiplier": "2.0", + "backOffMaxInterval": "120s", // 2m + "backOffMaxElapsedTime": "30m", // 30m + }, + overrides: func(config *retry.Config) { + config.Policy = retry.PolicyExponential + config.InitialInterval = 1 * time.Second + config.RandomizationFactor = 1.0 + config.Multiplier = 2.0 + config.MaxInterval = 2 * time.Minute + config.MaxElapsedTime = 30 * time.Minute + }, + err: "", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + var actual retry.Config + err := retry.DecodeConfigWithPrefix(&actual, tc.config, "backOff") + if tc.err != "" { + if assert.Error(t, err) { + assert.Equal(t, tc.err, err.Error()) + } + } else { + config := retry.DefaultConfig() + if tc.overrides != nil { + tc.overrides(&config) + } + assert.Equal(t, config, actual, "unexpected decoded configuration") + } + }) + } +} + +func TestRetryNotifyRecoverMaxRetries(t *testing.T) { + config := retry.DefaultConfig() + config.MaxRetries = 3 + config.Duration = 1 + + var operationCalls, notifyCalls, recoveryCalls int + + b := config.NewBackOff() + err := retry.NotifyRecover(func() error { + operationCalls++ + + return errRetry + }, b, func(err error, d time.Duration) { + notifyCalls++ + }, func() { + recoveryCalls++ + }) + + assert.Error(t, err) + assert.Equal(t, errRetry, err) + assert.Equal(t, 4, operationCalls) + assert.Equal(t, 1, notifyCalls) + assert.Equal(t, 0, recoveryCalls) +} + +func TestRetryNotifyRecoverRecovery(t *testing.T) { + config := retry.DefaultConfig() + config.MaxRetries = 3 + config.Duration = 1 + + var operationCalls, notifyCalls, recoveryCalls int + + b := config.NewBackOff() + err := retry.NotifyRecover(func() error { + operationCalls++ + + if operationCalls >= 2 { + return nil + } + + return errRetry + }, b, func(err error, d time.Duration) { + notifyCalls++ + }, func() { + recoveryCalls++ + }) + + assert.NoError(t, err) + assert.Equal(t, 2, operationCalls) + assert.Equal(t, 1, notifyCalls) + assert.Equal(t, 1, recoveryCalls) +} + +func TestRetryNotifyRecoverCancel(t *testing.T) { + config := retry.DefaultConfig() + config.Policy = retry.PolicyExponential + config.InitialInterval = 10 * time.Millisecond + + var notifyCalls, recoveryCalls int + + ctx, cancel := context.WithCancel(context.Background()) + b := config.NewBackOffWithContext(ctx) + errC := make(chan error, 1) + + go func() { + errC <- retry.NotifyRecover(func() error { + return errRetry + }, b, func(err error, d time.Duration) { + notifyCalls++ + }, func() { + recoveryCalls++ + }) + }() + + time.Sleep(1 * time.Second) + cancel() + + err := <-errC + assert.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled)) + assert.Equal(t, 1, notifyCalls) + assert.Equal(t, 0, recoveryCalls) +} + +func TestCheckEmptyConfig(t *testing.T) { + var config retry.Config + err := retry.DecodeConfig(&config, map[string]interface{}{}) + assert.NoError(t, err) + defaultConfig := retry.DefaultConfig() + assert.Equal(t, config, defaultConfig) +} diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index 5bcca2940..c64932e61 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -8,7 +8,7 @@ import ( consul "github.com/hashicorp/consul/api" - "github.com/dapr/components-contrib/nameresolution" + "github.com/dapr/components-contrib/internal/config" ) // The intermediateConfig is based off of the consul api types. User configurations are @@ -37,9 +37,8 @@ type configSpec struct { } func parseConfig(rawConfig interface{}) (configSpec, error) { - result := configSpec{} - config := intermediateConfig{} - rawConfig, err := nameresolution.ConvertConfig(rawConfig) + var result configSpec + rawConfig, err := config.Normalize(rawConfig) if err != nil { return result, err } @@ -52,11 +51,12 @@ func parseConfig(rawConfig interface{}) (configSpec, error) { decoder := json.NewDecoder(bytes.NewReader(data)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&config); err != nil { + var configuration intermediateConfig + if err := decoder.Decode(&configuration); err != nil { return result, fmt.Errorf("error deserializing to configSpec: %w", err) } - result = mapConfig(config) + result = mapConfig(configuration) return result, nil } diff --git a/nameresolution/kubernetes/kubernetes.go b/nameresolution/kubernetes/kubernetes.go index 75fb73be0..e7334ae0b 100644 --- a/nameresolution/kubernetes/kubernetes.go +++ b/nameresolution/kubernetes/kubernetes.go @@ -8,6 +8,7 @@ package kubernetes import ( "fmt" + "github.com/dapr/components-contrib/internal/config" "github.com/dapr/components-contrib/nameresolution" "github.com/dapr/kit/logger" ) @@ -32,7 +33,7 @@ func NewResolver(logger logger.Logger) nameresolution.Resolver { // Init initializes Kubernetes name resolver. func (k *resolver) Init(metadata nameresolution.Metadata) error { - configInterface, err := nameresolution.ConvertConfig(metadata.Configuration) + configInterface, err := config.Normalize(metadata.Configuration) if err != nil { return err } diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 9e101c7dd..c6f6660d5 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -17,6 +17,7 @@ import ( "github.com/cenkalti/backoff/v4" azservicebus "github.com/Azure/azure-service-bus-go" + "github.com/dapr/components-contrib/internal/retry" contrib_metadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -310,7 +311,7 @@ func (a *azureServiceBus) doPublish(sender *azservicebus.Topic, msg *azservicebu bo := backoff.WithMaxRetries(ebo, uint64(a.metadata.PublishMaxRetries)) bo = backoff.WithContext(bo, a.ctx) - return pubsub.RetryNotifyRecover(func() error { + return retry.NotifyRecover(func() error { ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) defer cancel() diff --git a/pubsub/hazelcast/hazelcast.go b/pubsub/hazelcast/hazelcast.go index 09f7b00db..42ab26598 100644 --- a/pubsub/hazelcast/hazelcast.go +++ b/pubsub/hazelcast/hazelcast.go @@ -12,6 +12,7 @@ import ( "github.com/hazelcast/hazelcast-go-client" hazelcastCore "github.com/hazelcast/hazelcast-go-client/core" + "github.com/dapr/components-contrib/internal/retry" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -151,7 +152,7 @@ func (l *hazelcastMessageListener) handleMessageObject(message []byte) error { b = backoff.WithMaxRetries(b, uint64(l.p.metadata.backOffMaxRetries)) } - return pubsub.RetryNotifyRecover(func() error { + return retry.NotifyRecover(func() error { l.p.logger.Debug("Processing Hazelcast message") return l.pubsubHandler(l.p.ctx, &pubsubMsg) diff --git a/pubsub/kafka/kafka.go b/pubsub/kafka/kafka.go index d0e1307a7..f11a79e09 100644 --- a/pubsub/kafka/kafka.go +++ b/pubsub/kafka/kafka.go @@ -17,7 +17,8 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/cenkalti/backoff/v4" + + "github.com/dapr/components-contrib/internal/retry" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -39,8 +40,9 @@ type Kafka struct { topics map[string]bool cancel context.CancelFunc consumer consumer - backOff backoff.BackOff config *sarama.Config + + backOffConfig retry.Config } type kafkaMetadata struct { @@ -53,8 +55,7 @@ type kafkaMetadata struct { } type consumer struct { - logger logger.Logger - backOff backoff.BackOff + k *Kafka ready chan bool callback pubsub.Handler once sync.Once @@ -65,24 +66,24 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai return fmt.Errorf("nil consumer callback") } - bo := backoff.WithContext(consumer.backOff, session.Context()) + b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context()) for message := range claim.Messages() { msg := pubsub.NewMessage{ Topic: message.Topic, Data: message.Value, } - if err := pubsub.RetryNotifyRecover(func() error { - consumer.logger.Debugf("Processing Kafka message: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) + if err := retry.NotifyRecover(func() error { + consumer.k.logger.Debugf("Processing Kafka message: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) err := consumer.callback(session.Context(), &msg) if err == nil { session.MarkMessage(message, "") } return err - }, bo, func(err error, d time.Duration) { - consumer.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) + }, b, func(err error, d time.Duration) { + consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) }, func() { - consumer.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) + consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) }); err != nil { return err } @@ -140,8 +141,14 @@ func (k *Kafka) Init(metadata pubsub.Metadata) error { k.topics = make(map[string]bool) - // TODO: Make the backoff configurable for constant or exponential - k.backOff = backoff.NewConstantBackOff(5 * time.Second) + // Default retry configuration is used if no + // backOff properties are set. + if err := retry.DecodeConfigWithPrefix( + &k.backOffConfig, + metadata.Properties, + "backOff"); err != nil { + return err + } k.logger.Debug("Kafka message bus initialization complete") @@ -227,8 +234,7 @@ func (k *Kafka) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) e ready := make(chan bool) k.consumer = consumer{ - logger: k.logger, - backOff: k.backOff, + k: k, ready: ready, callback: handler, } diff --git a/pubsub/mqtt/mqtt.go b/pubsub/mqtt/mqtt.go index e9e1e5cc0..863dd4a44 100644 --- a/pubsub/mqtt/mqtt.go +++ b/pubsub/mqtt/mqtt.go @@ -18,6 +18,7 @@ import ( "github.com/cenkalti/backoff/v4" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/dapr/components-contrib/internal/retry" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -216,7 +217,7 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl if m.metadata.backOffMaxRetries >= 0 { b = backoff.WithMaxRetries(m.backOff, uint64(m.metadata.backOffMaxRetries)) } - if err := pubsub.RetryNotifyRecover(func() error { + if err := retry.NotifyRecover(func() error { m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID()) if err := handler(m.ctx, &msg); err != nil { return err diff --git a/pubsub/natsstreaming/natsstreaming.go b/pubsub/natsstreaming/natsstreaming.go index 4f8a4c703..2f91badbe 100644 --- a/pubsub/natsstreaming/natsstreaming.go +++ b/pubsub/natsstreaming/natsstreaming.go @@ -16,11 +16,11 @@ import ( "strconv" "time" - "github.com/cenkalti/backoff/v4" nats "github.com/nats-io/nats.go" stan "github.com/nats-io/stan.go" "github.com/nats-io/stan.go/pb" + "github.com/dapr/components-contrib/internal/retry" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -65,9 +65,9 @@ type natsStreamingPubSub struct { logger logger.Logger - ctx context.Context - cancel context.CancelFunc - backOff backoff.BackOff + ctx context.Context + cancel context.CancelFunc + backOffConfig retry.Config } // NewNATSStreamingPubSub returns a new NATS Streaming pub-sub implementation @@ -193,13 +193,16 @@ func (n *natsStreamingPubSub) Init(metadata pubsub.Metadata) error { } n.logger.Debugf("connected to natsstreaming at %s", m.natsURL) - ctx, cancel := context.WithCancel(context.Background()) - n.ctx = ctx - n.cancel = cancel + n.ctx, n.cancel = context.WithCancel(context.Background()) - // TODO: Make the backoff configurable for constant or exponential - b := backoff.NewConstantBackOff(5 * time.Second) - n.backOff = backoff.WithContext(b, n.ctx) + // Default retry configuration is used if no + // backOff properties are set. + if err := retry.DecodeConfigWithPrefix( + &n.backOffConfig, + metadata.Properties, + "backOff"); err != nil { + return err + } n.natStreamingConn = natStreamingConn @@ -226,7 +229,9 @@ func (n *natsStreamingPubSub) Subscribe(req pubsub.SubscribeRequest, handler pub Topic: req.Topic, Data: natsMsg.Data, } - pubsub.RetryNotifyRecover(func() error { + b := n.backOffConfig.NewBackOffWithContext(n.ctx) + + rerr := retry.NotifyRecover(func() error { n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence) herr := handler(n.ctx, &msg) if herr == nil { @@ -235,11 +240,14 @@ func (n *natsStreamingPubSub) Subscribe(req pubsub.SubscribeRequest, handler pub } return herr - }, n.backOff, func(err error, d time.Duration) { + }, b, func(err error, d time.Duration) { n.logger.Errorf("Error processing NATS Streaming message: %s/%d. Retrying...", natsMsg.Subject, natsMsg.Sequence) }, func() { n.logger.Infof("Successfully processed NATS Streaming message after it previously failed: %s/%d", natsMsg.Subject, natsMsg.Sequence) }) + if rerr != nil && !errors.Is(rerr, context.Canceled) { + n.logger.Errorf("Error processing message and retries are exhausted: %s/%d.", natsMsg.Subject, natsMsg.Sequence) + } } if n.metadata.subscriptionType == subscriptionTypeTopic { diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index 60827e4bd..6d16c31fc 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -8,9 +8,9 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" - "github.com/cenkalti/backoff/v4" lru "github.com/hashicorp/golang-lru" + "github.com/dapr/components-contrib/internal/retry" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -27,10 +27,10 @@ type Pulsar struct { producer pulsar.Producer metadata pulsarMetadata - ctx context.Context - cancel context.CancelFunc - backOff backoff.BackOff - cache *lru.Cache + ctx context.Context + cancel context.CancelFunc + backOffConfig retry.Config + cache *lru.Cache } func NewPulsar(l logger.Logger) pubsub.PubSub { @@ -92,9 +92,14 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error { p.client = client p.metadata = *m - // TODO: Make the backoff configurable for constant or exponential - b := backoff.NewConstantBackOff(5 * time.Second) - p.backOff = backoff.WithContext(b, p.ctx) + // Default retry configuration is used if no + // backOff properties are set. + if err := retry.DecodeConfigWithPrefix( + &p.backOffConfig, + metadata.Properties, + "backOff"); err != nil { + return err + } return nil } @@ -174,7 +179,9 @@ func (p *Pulsar) handleMessage(msg pulsar.ConsumerMessage, handler pubsub.Handle Metadata: msg.Properties(), } - return pubsub.RetryNotifyRecover(func() error { + b := p.backOffConfig.NewBackOffWithContext(p.ctx) + + return retry.NotifyRecover(func() error { p.logger.Debugf("Processing Pulsar message %s/%#v", msg.Topic(), msg.ID()) err := handler(p.ctx, &pubsubMsg) if err == nil { @@ -182,7 +189,7 @@ func (p *Pulsar) handleMessage(msg pulsar.ConsumerMessage, handler pubsub.Handle } return err - }, p.backOff, func(err error, d time.Duration) { + }, b, func(err error, d time.Duration) { p.logger.Errorf("Error processing Pulsar message: %s/%#v [key=%s]. Retrying...", msg.Topic(), msg.ID(), msg.Key()) }, func() { p.logger.Infof("Successfully processed Pulsar message after it previously failed: %s/%#v [key=%s]", msg.Topic(), msg.ID(), msg.Key()) diff --git a/pubsub/retry.go b/pubsub/retry.go deleted file mode 100644 index 8d2c62a90..000000000 --- a/pubsub/retry.go +++ /dev/null @@ -1,31 +0,0 @@ -package pubsub - -import ( - "time" - - "github.com/cenkalti/backoff/v4" -) - -// RetryNotifyRecover is a wrapper around backoff.RetryNotify that adds another callback for when an operation -// previously failed but has since recovered. The main purpose of this wrapper is to call `notify` only when -// the operations fails the first time and `recovered` when it finally succeeds. This can be helpful in limiting -// log messages to only the events that operators need to be alerted on. -func RetryNotifyRecover(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, recovered func()) error { - var notified bool - - return backoff.RetryNotify(func() error { - err := operation() - - if err == nil && notified { - notified = false - recovered() - } - - return err - }, b, func(err error, d time.Duration) { - if !notified { - notify(err, d) - notified = true - } - }) -}