mirror of https://github.com/dapr/kit.git
217 lines
6.7 KiB
Go
217 lines
6.7 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package retry
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
|
|
"github.com/dapr/kit/config"
|
|
)
|
|
|
|
// PolicyType denotes if the back off delay should be constant or exponential.
|
|
type PolicyType int
|
|
|
|
const (
|
|
// PolicyConstant is a backoff policy that always returns the same backoff delay.
|
|
PolicyConstant PolicyType = iota
|
|
// PolicyExponential is a backoff implementation that increases the backoff period
|
|
// for each retry attempt using a randomization function that grows exponentially.
|
|
PolicyExponential
|
|
)
|
|
|
|
// Config encapsulates the back off policy configuration.
|
|
type Config struct {
|
|
Policy PolicyType `mapstructure:"policy"`
|
|
|
|
// Constant back off
|
|
Duration time.Duration `mapstructure:"duration"`
|
|
|
|
// Exponential back off
|
|
InitialInterval time.Duration `mapstructure:"initialInterval"`
|
|
RandomizationFactor float32 `mapstructure:"randomizationFactor"`
|
|
Multiplier float32 `mapstructure:"multiplier"`
|
|
MaxInterval time.Duration `mapstructure:"maxInterval"`
|
|
MaxElapsedTime time.Duration `mapstructure:"maxElapsedTime"`
|
|
|
|
// Additional options
|
|
MaxRetries int64 `mapstructure:"maxRetries"`
|
|
}
|
|
|
|
// String implements fmt.Stringer and is used for debugging.
|
|
func (c Config) String() string {
|
|
return fmt.Sprintf(
|
|
"policy='%s' duration='%v' initialInterval='%v' randomizationFactor='%f' multiplier='%f' maxInterval='%v' maxElapsedTime='%v' maxRetries='%d'",
|
|
c.Policy, c.Duration, c.InitialInterval, c.RandomizationFactor, c.Multiplier, c.MaxInterval, c.MaxElapsedTime, c.MaxRetries,
|
|
)
|
|
}
|
|
|
|
// DefaultConfig represents the default configuration for a `Config`.
|
|
func DefaultConfig() Config {
|
|
return Config{
|
|
Policy: PolicyConstant,
|
|
Duration: 5 * time.Second,
|
|
InitialInterval: backoff.DefaultInitialInterval,
|
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
|
Multiplier: backoff.DefaultMultiplier,
|
|
MaxInterval: backoff.DefaultMaxInterval,
|
|
MaxElapsedTime: backoff.DefaultMaxElapsedTime,
|
|
MaxRetries: -1,
|
|
}
|
|
}
|
|
|
|
// DefaultConfigWithNoRetry represents the default configuration with `MaxRetries` set to 0.
|
|
// This may be useful for those brokers which can handles retries on its own.
|
|
func DefaultConfigWithNoRetry() Config {
|
|
c := DefaultConfig()
|
|
c.MaxRetries = 0
|
|
|
|
return c
|
|
}
|
|
|
|
// DecodeConfig decodes a Go struct into a `Config`.
|
|
func DecodeConfig(c *Config, input interface{}) error {
|
|
// Use the deefault config if `c` is empty/zero value.
|
|
var emptyConfig Config
|
|
if *c == emptyConfig {
|
|
*c = DefaultConfig()
|
|
}
|
|
|
|
return config.Decode(input, c)
|
|
}
|
|
|
|
// DecodeConfigWithPrefix decodes a Go struct into a `Config`.
|
|
func DecodeConfigWithPrefix(c *Config, input interface{}, prefix string) error {
|
|
input, err := config.PrefixedBy(input, prefix)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return DecodeConfig(c, input)
|
|
}
|
|
|
|
// NewBackOff returns a BackOff instance for use with `NotifyRecover`
|
|
// or `backoff.RetryNotify` directly. The instance will not stop due to
|
|
// context cancellation. To support cancellation (recommended), use
|
|
// `NewBackOffWithContext`.
|
|
//
|
|
// Since the underlying backoff implementations are not always thread safe,
|
|
// `NewBackOff` or `NewBackOffWithContext` should be called each time
|
|
// `RetryNotifyRecover` or `backoff.RetryNotify` is used.
|
|
func (c *Config) NewBackOff() backoff.BackOff {
|
|
var b backoff.BackOff
|
|
switch c.Policy {
|
|
case PolicyConstant:
|
|
b = backoff.NewConstantBackOff(c.Duration)
|
|
case PolicyExponential:
|
|
eb := backoff.NewExponentialBackOff()
|
|
eb.InitialInterval = c.InitialInterval
|
|
eb.RandomizationFactor = float64(c.RandomizationFactor)
|
|
eb.Multiplier = float64(c.Multiplier)
|
|
eb.MaxInterval = c.MaxInterval
|
|
eb.MaxElapsedTime = c.MaxElapsedTime
|
|
b = eb
|
|
}
|
|
|
|
if c.MaxRetries >= 0 {
|
|
b = backoff.WithMaxRetries(b, uint64(c.MaxRetries))
|
|
}
|
|
|
|
return b
|
|
}
|
|
|
|
// NewBackOffWithContext returns a BackOff instance for use with `RetryNotifyRecover`
|
|
// or `backoff.RetryNotify` directly. The provided context is used to cancel retries
|
|
// if it is canceled.
|
|
//
|
|
// Since the underlying backoff implementations are not always thread safe,
|
|
// `NewBackOff` or `NewBackOffWithContext` should be called each time
|
|
// `RetryNotifyRecover` or `backoff.RetryNotify` is used.
|
|
func (c *Config) NewBackOffWithContext(ctx context.Context) backoff.BackOff {
|
|
b := c.NewBackOff()
|
|
|
|
return backoff.WithContext(b, ctx)
|
|
}
|
|
|
|
// NotifyRecover is a wrapper around backoff.RetryNotify that adds another callback for when an operation
|
|
// previously failed but has since recovered. The main purpose of this wrapper is to call `notify` only when
|
|
// the operations fails the first time and `recovered` when it finally succeeds. This can be helpful in limiting
|
|
// log messages to only the events that operators need to be alerted on.
|
|
func NotifyRecover(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, recovered func()) error {
|
|
notified := atomic.Bool{}
|
|
|
|
return backoff.RetryNotify(func() error {
|
|
err := operation()
|
|
|
|
if err == nil && notified.Load() {
|
|
recovered()
|
|
}
|
|
|
|
return err
|
|
}, b, func(err error, d time.Duration) {
|
|
if notified.CompareAndSwap(false, true) {
|
|
notify(err, d)
|
|
}
|
|
})
|
|
}
|
|
|
|
// NotifyRecoverWithData is a variant of NotifyRecover that also returns data in addition to an error.
|
|
func NotifyRecoverWithData[T any](operation backoff.OperationWithData[T], b backoff.BackOff, notify backoff.Notify, recovered func()) (T, error) {
|
|
notified := atomic.Bool{}
|
|
|
|
return backoff.RetryNotifyWithData(func() (T, error) {
|
|
res, err := operation()
|
|
|
|
if err == nil && notified.Load() {
|
|
recovered()
|
|
}
|
|
|
|
return res, err
|
|
}, b, func(err error, d time.Duration) {
|
|
if notified.CompareAndSwap(false, true) {
|
|
notify(err, d)
|
|
}
|
|
})
|
|
}
|
|
|
|
// DecodeString handles converting a string value to `p`.
|
|
func (p *PolicyType) DecodeString(value string) error {
|
|
switch strings.ToLower(value) {
|
|
case "constant":
|
|
*p = PolicyConstant
|
|
case "exponential":
|
|
*p = PolicyExponential
|
|
default:
|
|
return fmt.Errorf("unexpected back off policy type: %s", value)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// String implements fmt.Stringer and is used for debugging.
|
|
func (p PolicyType) String() string {
|
|
switch p {
|
|
case PolicyConstant:
|
|
return "constant"
|
|
case PolicyExponential:
|
|
return "exponential"
|
|
default:
|
|
return ""
|
|
}
|
|
}
|