Azure Service Bus publishing retries when server is "too busy" (#821)

* Adding retry/backoff to publishing to Azure Service Bus when the server is too busy and throttles the connection

* Fixing linter issues
This commit is contained in:
Phil Kedy 2021-04-14 14:56:55 -04:00 committed by GitHub
parent 63f4764757
commit 58b43fbfdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 60 deletions

4
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/Azure/azure-service-bus-go v0.10.10
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-amqp v0.13.1
github.com/Azure/go-autorest/autorest v0.11.12
github.com/Azure/go-autorest/autorest/adal v0.9.5
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
@ -63,8 +64,9 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/sendgrid/sendgrid-go v3.5.0+incompatible
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.19.0
github.com/vmware/vmware-go-kcl v0.0.0-20191104173950-b6c74c3fe74e
go.mongodb.org/mongo-driver v1.1.2

7
go.sum
View File

@ -753,7 +753,6 @@ github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgo
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@ -1032,8 +1031,9 @@ github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
@ -1075,8 +1075,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8=

View File

@ -8,20 +8,22 @@ package servicebus
// Reference for settings:
// https://github.com/Azure/azure-service-bus-go/blob/54b2faa53e5216616e59725281be692acc120c34/subscription_manager.go#L101
type metadata struct {
ConnectionString string `json:"connectionString"`
ConsumerID string `json:"consumerID"`
TimeoutInSec int `json:"timeoutInSec"`
HandlerTimeoutInSec int `json:"handlerTimeoutInSec"`
LockRenewalInSec int `json:"lockRenewalInSec"`
MaxActiveMessages int `json:"maxActiveMessages"`
MaxActiveMessagesRecoveryInSec int `json:"maxActiveMessagesRecoveryInSec"`
MaxReconnectionAttempts int `json:"maxReconnectionAttempts"`
ConnectionRecoveryInSec int `json:"connectionRecoveryInSec"`
DisableEntityManagement bool `json:"disableEntityManagement"`
MaxDeliveryCount *int `json:"maxDeliveryCount"`
LockDurationInSec *int `json:"lockDurationInSec"`
DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"`
AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"`
MaxConcurrentHandlers *int `json:"maxConcurrentHandlers"`
PrefetchCount *int `json:"prefetchCount"`
ConnectionString string `json:"connectionString"`
ConsumerID string `json:"consumerID"`
TimeoutInSec int `json:"timeoutInSec"`
HandlerTimeoutInSec int `json:"handlerTimeoutInSec"`
LockRenewalInSec int `json:"lockRenewalInSec"`
MaxActiveMessages int `json:"maxActiveMessages"`
MaxActiveMessagesRecoveryInSec int `json:"maxActiveMessagesRecoveryInSec"`
MaxReconnectionAttempts int `json:"maxReconnectionAttempts"`
ConnectionRecoveryInSec int `json:"connectionRecoveryInSec"`
DisableEntityManagement bool `json:"disableEntityManagement"`
MaxDeliveryCount *int `json:"maxDeliveryCount"`
LockDurationInSec *int `json:"lockDurationInSec"`
DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"`
AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"`
MaxConcurrentHandlers *int `json:"maxConcurrentHandlers"`
PrefetchCount *int `json:"prefetchCount"`
PublishMaxRetries int `json:"publishMaxRetries"`
PublishInitialRetryIntervalInMs int `json:"publishInitialRetryInternalInMs"`
}

View File

@ -7,11 +7,15 @@ package servicebus
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/Azure/go-amqp"
"github.com/cenkalti/backoff/v4"
azservicebus "github.com/Azure/azure-service-bus-go"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
@ -20,23 +24,25 @@ import (
const (
// Keys
connectionString = "connectionString"
consumerID = "consumerID"
maxDeliveryCount = "maxDeliveryCount"
timeoutInSec = "timeoutInSec"
lockDurationInSec = "lockDurationInSec"
lockRenewalInSec = "lockRenewalInSec"
defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec"
autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec"
disableEntityManagement = "disableEntityManagement"
maxConcurrentHandlers = "maxConcurrentHandlers"
handlerTimeoutInSec = "handlerTimeoutInSec"
prefetchCount = "prefetchCount"
maxActiveMessages = "maxActiveMessages"
maxActiveMessagesRecoveryInSec = "maxActiveMessagesRecoveryInSec"
maxReconnectionAttempts = "maxReconnectionAttempts"
connectionRecoveryInSec = "connectionRecoveryInSec"
errorMessagePrefix = "azure service bus error:"
connectionString = "connectionString"
consumerID = "consumerID"
maxDeliveryCount = "maxDeliveryCount"
timeoutInSec = "timeoutInSec"
lockDurationInSec = "lockDurationInSec"
lockRenewalInSec = "lockRenewalInSec"
defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec"
autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec"
disableEntityManagement = "disableEntityManagement"
maxConcurrentHandlers = "maxConcurrentHandlers"
handlerTimeoutInSec = "handlerTimeoutInSec"
prefetchCount = "prefetchCount"
maxActiveMessages = "maxActiveMessages"
maxActiveMessagesRecoveryInSec = "maxActiveMessagesRecoveryInSec"
maxReconnectionAttempts = "maxReconnectionAttempts"
connectionRecoveryInSec = "connectionRecoveryInSec"
publishMaxRetries = "publishMaxRetries"
publishInitialRetryInternalInMs = "publishInitialRetryInternalInMs"
errorMessagePrefix = "azure service bus error:"
// Defaults
defaultTimeoutInSec = 60
@ -44,11 +50,13 @@ const (
defaultLockRenewalInSec = 20
// ASB Messages can be up to 256Kb. 10000 messages at this size would roughly use 2.56Gb.
// We should change this if performance testing suggests a more sensible default.
defaultMaxActiveMessages = 10000
defaultMaxActiveMessagesRecoveryInSec = 2
defaultDisableEntityManagement = false
defaultMaxReconnectionAttempts = 30
defaultConnectionRecoveryInSec = 2
defaultMaxActiveMessages = 10000
defaultMaxActiveMessagesRecoveryInSec = 2
defaultDisableEntityManagement = false
defaultMaxReconnectionAttempts = 30
defaultConnectionRecoveryInSec = 2
defaultPublishMaxRetries = 5
defaultPublishInitialRetryInternalInMs = 500
)
type handle = struct{}
@ -62,6 +70,9 @@ type azureServiceBus struct {
features []pubsub.Feature
topics map[string]*azservicebus.Topic
topicsLock *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
// NewAzureServiceBus returns a new Azure ServiceBus pub-sub implementation
@ -215,6 +226,26 @@ func parseAzureServiceBusMetadata(meta pubsub.Metadata) (metadata, error) {
m.PrefetchCount = &valAsInt
}
m.PublishMaxRetries = defaultPublishMaxRetries
if val, ok := meta.Properties[publishMaxRetries]; ok && val != "" {
var err error
valAsInt, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("%s invalid publishMaxRetries %s, %s", errorMessagePrefix, val, err)
}
m.PublishMaxRetries = valAsInt
}
m.PublishInitialRetryIntervalInMs = defaultPublishInitialRetryInternalInMs
if val, ok := meta.Properties[publishInitialRetryInternalInMs]; ok && val != "" {
var err error
valAsInt, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("%s invalid publishInitialRetryIntervalInMs %s, %s", errorMessagePrefix, val, err)
}
m.PublishInitialRetryIntervalInMs = valAsInt
}
return m, nil
}
@ -232,17 +263,12 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) error {
a.topicManager = a.namespace.NewTopicManager()
a.ctx, a.cancel = context.WithCancel(context.Background())
return nil
}
func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
if !a.metadata.DisableEntityManagement {
err := a.ensureTopic(req.Topic)
if err != nil {
return err
}
}
var sender *azservicebus.Topic
var err error
@ -253,6 +279,12 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
a.topicsLock.RUnlock()
if sender == nil {
// Ensure the topic exists the first time it is referenced.
if !a.metadata.DisableEntityManagement {
if err = a.ensureTopic(req.Topic); err != nil {
return err
}
}
a.topicsLock.Lock()
sender, err = a.namespace.NewTopic(req.Topic)
a.topics[req.Topic] = sender
@ -263,21 +295,44 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec))
defer cancel()
msg := azservicebus.NewMessage(req.Data)
ttl, hasTTL, _ := contrib_metadata.TryGetTTL(req.Metadata)
if hasTTL {
msg.TTL = &ttl
}
err = sender.Send(ctx, msg)
if err != nil {
return err
}
return a.doPublish(sender, msg)
}
return nil
func (a *azureServiceBus) doPublish(sender *azservicebus.Topic, msg *azservicebus.Message) error {
ebo := backoff.NewExponentialBackOff()
ebo.InitialInterval = time.Duration(a.metadata.PublishInitialRetryIntervalInMs) * time.Millisecond
bo := backoff.WithMaxRetries(ebo, uint64(a.metadata.PublishMaxRetries))
bo = backoff.WithContext(bo, a.ctx)
return pubsub.RetryNotifyRecover(func() error {
ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec))
defer cancel()
err := sender.Send(ctx, msg)
if err == nil {
return nil
}
var ampqError *amqp.Error
if errors.As(err, &ampqError) && ampqError.Condition == "com.microsoft:server-busy" {
return ampqError // Retries
}
var connClosedError azservicebus.ErrConnectionClosed
if errors.As(err, &connClosedError) {
return connClosedError // Retries
}
return backoff.Permanent(err) // Does not retry
}, bo, func(err error, _ time.Duration) {
a.logger.Debugf("Could not publish service bus message. Retrying...: %v", err)
}, func() {
a.logger.Debug("Successfully published service bus message after it previously failed")
})
}
func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
@ -302,7 +357,7 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub.
// Periodically refill the reconnect attempts channel to avoid
// exhausting all the refill attempts due to intermittent issues
// ocurring over a longer period of time.
reconnCtx, reconnCancel := context.WithCancel(context.TODO())
reconnCtx, reconnCancel := context.WithCancel(a.ctx)
defer reconnCancel()
go func() {
for {
@ -493,13 +548,15 @@ func (a *azureServiceBus) createSubscriptionManagementOptions() ([]azservicebus.
func (a *azureServiceBus) Close() error {
for _, s := range a.subscriptions {
s.close(context.TODO())
s.close(a.ctx)
}
for _, t := range a.topics {
t.Close(context.TODO())
t.Close(a.ctx)
}
a.cancel()
return nil
}