diff --git a/go.mod b/go.mod index f600d10fa..de05b3987 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/Azure/azure-service-bus-go v0.10.10 github.com/Azure/azure-storage-blob-go v0.8.0 github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd + github.com/Azure/go-amqp v0.13.1 github.com/Azure/go-autorest/autorest v0.11.12 github.com/Azure/go-autorest/autorest/adal v0.9.5 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 @@ -63,8 +64,9 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da github.com/sendgrid/sendgrid-go v3.5.0+incompatible + github.com/sirupsen/logrus v1.8.1 // indirect github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.7.0 github.com/valyala/fasthttp v1.19.0 github.com/vmware/vmware-go-kcl v0.0.0-20191104173950-b6c74c3fe74e go.mongodb.org/mongo-driver v1.1.2 diff --git a/go.sum b/go.sum index db01aed9f..b3eb905cd 100644 --- a/go.sum +++ b/go.sum @@ -753,7 +753,6 @@ github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgo github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -1032,8 +1031,9 @@ github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= @@ -1075,8 +1075,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8= diff --git a/pubsub/azure/servicebus/metadata.go b/pubsub/azure/servicebus/metadata.go index ee69229f0..304df323f 100644 --- a/pubsub/azure/servicebus/metadata.go +++ b/pubsub/azure/servicebus/metadata.go @@ -8,20 +8,22 @@ package servicebus // Reference for settings: // https://github.com/Azure/azure-service-bus-go/blob/54b2faa53e5216616e59725281be692acc120c34/subscription_manager.go#L101 type metadata struct { - ConnectionString string `json:"connectionString"` - ConsumerID string `json:"consumerID"` - TimeoutInSec int `json:"timeoutInSec"` - HandlerTimeoutInSec int `json:"handlerTimeoutInSec"` - LockRenewalInSec int `json:"lockRenewalInSec"` - MaxActiveMessages int `json:"maxActiveMessages"` - MaxActiveMessagesRecoveryInSec int `json:"maxActiveMessagesRecoveryInSec"` - MaxReconnectionAttempts int `json:"maxReconnectionAttempts"` - ConnectionRecoveryInSec int `json:"connectionRecoveryInSec"` - DisableEntityManagement bool `json:"disableEntityManagement"` - MaxDeliveryCount *int `json:"maxDeliveryCount"` - LockDurationInSec *int `json:"lockDurationInSec"` - DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"` - AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"` - MaxConcurrentHandlers *int `json:"maxConcurrentHandlers"` - PrefetchCount *int `json:"prefetchCount"` + ConnectionString string `json:"connectionString"` + ConsumerID string `json:"consumerID"` + TimeoutInSec int `json:"timeoutInSec"` + HandlerTimeoutInSec int `json:"handlerTimeoutInSec"` + LockRenewalInSec int `json:"lockRenewalInSec"` + MaxActiveMessages int `json:"maxActiveMessages"` + MaxActiveMessagesRecoveryInSec int `json:"maxActiveMessagesRecoveryInSec"` + MaxReconnectionAttempts int `json:"maxReconnectionAttempts"` + ConnectionRecoveryInSec int `json:"connectionRecoveryInSec"` + DisableEntityManagement bool `json:"disableEntityManagement"` + MaxDeliveryCount *int `json:"maxDeliveryCount"` + LockDurationInSec *int `json:"lockDurationInSec"` + DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"` + AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"` + MaxConcurrentHandlers *int `json:"maxConcurrentHandlers"` + PrefetchCount *int `json:"prefetchCount"` + PublishMaxRetries int `json:"publishMaxRetries"` + PublishInitialRetryIntervalInMs int `json:"publishInitialRetryInternalInMs"` } diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 7893375da..42c8294cb 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -7,11 +7,15 @@ package servicebus import ( "context" + "errors" "fmt" "strconv" "sync" "time" + "github.com/Azure/go-amqp" + "github.com/cenkalti/backoff/v4" + azservicebus "github.com/Azure/azure-service-bus-go" contrib_metadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" @@ -20,23 +24,25 @@ import ( const ( // Keys - connectionString = "connectionString" - consumerID = "consumerID" - maxDeliveryCount = "maxDeliveryCount" - timeoutInSec = "timeoutInSec" - lockDurationInSec = "lockDurationInSec" - lockRenewalInSec = "lockRenewalInSec" - defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec" - autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec" - disableEntityManagement = "disableEntityManagement" - maxConcurrentHandlers = "maxConcurrentHandlers" - handlerTimeoutInSec = "handlerTimeoutInSec" - prefetchCount = "prefetchCount" - maxActiveMessages = "maxActiveMessages" - maxActiveMessagesRecoveryInSec = "maxActiveMessagesRecoveryInSec" - maxReconnectionAttempts = "maxReconnectionAttempts" - connectionRecoveryInSec = "connectionRecoveryInSec" - errorMessagePrefix = "azure service bus error:" + connectionString = "connectionString" + consumerID = "consumerID" + maxDeliveryCount = "maxDeliveryCount" + timeoutInSec = "timeoutInSec" + lockDurationInSec = "lockDurationInSec" + lockRenewalInSec = "lockRenewalInSec" + defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec" + autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec" + disableEntityManagement = "disableEntityManagement" + maxConcurrentHandlers = "maxConcurrentHandlers" + handlerTimeoutInSec = "handlerTimeoutInSec" + prefetchCount = "prefetchCount" + maxActiveMessages = "maxActiveMessages" + maxActiveMessagesRecoveryInSec = "maxActiveMessagesRecoveryInSec" + maxReconnectionAttempts = "maxReconnectionAttempts" + connectionRecoveryInSec = "connectionRecoveryInSec" + publishMaxRetries = "publishMaxRetries" + publishInitialRetryInternalInMs = "publishInitialRetryInternalInMs" + errorMessagePrefix = "azure service bus error:" // Defaults defaultTimeoutInSec = 60 @@ -44,11 +50,13 @@ const ( defaultLockRenewalInSec = 20 // ASB Messages can be up to 256Kb. 10000 messages at this size would roughly use 2.56Gb. // We should change this if performance testing suggests a more sensible default. - defaultMaxActiveMessages = 10000 - defaultMaxActiveMessagesRecoveryInSec = 2 - defaultDisableEntityManagement = false - defaultMaxReconnectionAttempts = 30 - defaultConnectionRecoveryInSec = 2 + defaultMaxActiveMessages = 10000 + defaultMaxActiveMessagesRecoveryInSec = 2 + defaultDisableEntityManagement = false + defaultMaxReconnectionAttempts = 30 + defaultConnectionRecoveryInSec = 2 + defaultPublishMaxRetries = 5 + defaultPublishInitialRetryInternalInMs = 500 ) type handle = struct{} @@ -62,6 +70,9 @@ type azureServiceBus struct { features []pubsub.Feature topics map[string]*azservicebus.Topic topicsLock *sync.RWMutex + + ctx context.Context + cancel context.CancelFunc } // NewAzureServiceBus returns a new Azure ServiceBus pub-sub implementation @@ -215,6 +226,26 @@ func parseAzureServiceBusMetadata(meta pubsub.Metadata) (metadata, error) { m.PrefetchCount = &valAsInt } + m.PublishMaxRetries = defaultPublishMaxRetries + if val, ok := meta.Properties[publishMaxRetries]; ok && val != "" { + var err error + valAsInt, err := strconv.Atoi(val) + if err != nil { + return m, fmt.Errorf("%s invalid publishMaxRetries %s, %s", errorMessagePrefix, val, err) + } + m.PublishMaxRetries = valAsInt + } + + m.PublishInitialRetryIntervalInMs = defaultPublishInitialRetryInternalInMs + if val, ok := meta.Properties[publishInitialRetryInternalInMs]; ok && val != "" { + var err error + valAsInt, err := strconv.Atoi(val) + if err != nil { + return m, fmt.Errorf("%s invalid publishInitialRetryIntervalInMs %s, %s", errorMessagePrefix, val, err) + } + m.PublishInitialRetryIntervalInMs = valAsInt + } + return m, nil } @@ -232,17 +263,12 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) error { a.topicManager = a.namespace.NewTopicManager() + a.ctx, a.cancel = context.WithCancel(context.Background()) + return nil } func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { - if !a.metadata.DisableEntityManagement { - err := a.ensureTopic(req.Topic) - if err != nil { - return err - } - } - var sender *azservicebus.Topic var err error @@ -253,6 +279,12 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { a.topicsLock.RUnlock() if sender == nil { + // Ensure the topic exists the first time it is referenced. + if !a.metadata.DisableEntityManagement { + if err = a.ensureTopic(req.Topic); err != nil { + return err + } + } a.topicsLock.Lock() sender, err = a.namespace.NewTopic(req.Topic) a.topics[req.Topic] = sender @@ -263,21 +295,44 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { } } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) - defer cancel() - msg := azservicebus.NewMessage(req.Data) ttl, hasTTL, _ := contrib_metadata.TryGetTTL(req.Metadata) if hasTTL { msg.TTL = &ttl } - err = sender.Send(ctx, msg) - if err != nil { - return err - } + return a.doPublish(sender, msg) +} - return nil +func (a *azureServiceBus) doPublish(sender *azservicebus.Topic, msg *azservicebus.Message) error { + ebo := backoff.NewExponentialBackOff() + ebo.InitialInterval = time.Duration(a.metadata.PublishInitialRetryIntervalInMs) * time.Millisecond + bo := backoff.WithMaxRetries(ebo, uint64(a.metadata.PublishMaxRetries)) + bo = backoff.WithContext(bo, a.ctx) + + return pubsub.RetryNotifyRecover(func() error { + ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) + defer cancel() + + err := sender.Send(ctx, msg) + if err == nil { + return nil + } + var ampqError *amqp.Error + if errors.As(err, &qError) && ampqError.Condition == "com.microsoft:server-busy" { + return ampqError // Retries + } + var connClosedError azservicebus.ErrConnectionClosed + if errors.As(err, &connClosedError) { + return connClosedError // Retries + } + + return backoff.Permanent(err) // Does not retry + }, bo, func(err error, _ time.Duration) { + a.logger.Debugf("Could not publish service bus message. Retrying...: %v", err) + }, func() { + a.logger.Debug("Successfully published service bus message after it previously failed") + }) } func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { @@ -302,7 +357,7 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. // Periodically refill the reconnect attempts channel to avoid // exhausting all the refill attempts due to intermittent issues // ocurring over a longer period of time. - reconnCtx, reconnCancel := context.WithCancel(context.TODO()) + reconnCtx, reconnCancel := context.WithCancel(a.ctx) defer reconnCancel() go func() { for { @@ -493,13 +548,15 @@ func (a *azureServiceBus) createSubscriptionManagementOptions() ([]azservicebus. func (a *azureServiceBus) Close() error { for _, s := range a.subscriptions { - s.close(context.TODO()) + s.close(a.ctx) } for _, t := range a.topics { - t.Close(context.TODO()) + t.Close(a.ctx) } + a.cancel() + return nil }