Merge pull request #2181 from ItalyPaleAle/asb-publish-conn-fix

Fix for handling "context deadline exceeded" on ASB publishing
This commit is contained in:
Bernd Verst 2022-10-12 12:13:55 -07:00 committed by GitHub
commit 5b0679ff8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 63 deletions

View File

@ -16,13 +16,13 @@ package servicebusqueues
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/go-amqp"
backoff "github.com/cenkalti/backoff/v4"
"github.com/dapr/components-contrib/bindings"
@ -141,22 +141,10 @@ func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var err error
a.senderLock.RLock()
sender := a.sender
a.senderLock.RUnlock()
if sender == nil {
a.senderLock.Lock()
sender, err = a.client.NewSender(a.metadata.QueueName, nil)
if err != nil {
a.senderLock.Unlock()
return nil, err
}
a.sender = sender
a.senderLock.Unlock()
func (a *AzureServiceBusQueues) Invoke(invokeCtx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
sender, err := a.getSender()
if err != nil {
return nil, fmt.Errorf("failed to create a sender for the Service Bus queue: %w", err)
}
msg := &servicebus.Message{
@ -187,10 +175,19 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke
msg.TimeToLive = &ttl
}
ctx, cancel := context.WithTimeout(ctx, a.timeout)
// Send the message
ctx, cancel := context.WithTimeout(invokeCtx, a.timeout)
defer cancel()
err = sender.SendMessage(ctx, msg, nil)
if err != nil {
if impl.IsNetworkError(err) {
// Force reconnection on next call
a.deleteSender()
}
return nil, err
}
return nil, sender.SendMessage(ctx, msg, nil)
return nil, nil
}
func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindings.Handler) error {
@ -219,7 +216,7 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
})
if err != nil {
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
if err != context.Canceled {
if errors.Is(err, context.Canceled) {
a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error())
}
return
@ -235,15 +232,8 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
bo.Reset()
},
)
if err != nil {
var detachError *amqp.DetachError
var amqpError *amqp.Error
if errors.Is(err, detachError) ||
(errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) {
a.logger.Debug(err)
} else {
a.logger.Error(err)
}
if err != nil && !errors.Is(err, context.Canceled) {
a.logger.Error(err)
}
// Gracefully close the connection (in case it's not closed already)
@ -267,6 +257,46 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
return nil
}
// getSender returns the Sender object, creating a new connection if needed
func (a *AzureServiceBusQueues) getSender() (*servicebus.Sender, error) {
// Check if the sender already exists
a.senderLock.RLock()
if a.sender != nil {
a.senderLock.RUnlock()
return a.sender, nil
}
a.senderLock.RUnlock()
// Acquire a write lock then try checking a.sender again in case another goroutine modified that in the meanwhile
a.senderLock.Lock()
defer a.senderLock.Unlock()
if a.sender != nil {
return a.sender, nil
}
// Create a new sender
sender, err := a.client.NewSender(a.metadata.QueueName, nil)
if err != nil {
return nil, err
}
a.sender = sender
return sender, nil
}
// deleteSender deletes the sender, closing the connection
func (a *AzureServiceBusQueues) deleteSender() {
a.senderLock.Lock()
if a.sender != nil {
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second)
_ = a.sender.Close(closeCtx)
closeCancel()
a.sender = nil
}
a.senderLock.Unlock()
}
func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.HandlerFunc {
return func(ctx context.Context, msg *servicebus.ReceivedMessage) error {
metadata := make(map[string]string)

View File

@ -0,0 +1,30 @@
package servicebus
import (
"context"
"errors"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-amqp"
)
// IsNetworkError returns true if the error returned by Service Bus is a network-level one, which would require reconnecting.
func IsNetworkError(err error) bool {
if err == nil {
return false
}
var expError *azservicebus.Error
if errors.As(err, &expError) {
if expError.Code == "connlost" {
return true
}
}
// Context deadline exceeded errors often happen when the connection is just "hanging"
if errors.Is(err, amqp.ErrConnClosed) || errors.Is(err, context.DeadlineExceeded) {
return true
}
return false
}

View File

@ -303,11 +303,6 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) (err error) {
}
func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
sender, err := a.senderForTopic(a.publishCtx, req.Topic)
if err != nil {
return err
}
// a.logger.Debugf("Creating message with body: %s", string(req.Data))
msg, err := NewASBMessageFromPubsubRequest(req)
if err != nil {
@ -325,40 +320,43 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
}
return retry.NotifyRecover(
func() (err error) {
// Get the sender
var sender *servicebus.Sender
sender, err = a.senderForTopic(a.publishCtx, req.Topic)
if err != nil {
return err
}
// Try sending the message
ctx, cancel := context.WithTimeout(a.publishCtx, time.Second*time.Duration(a.metadata.TimeoutInSec))
defer cancel()
err = sender.SendMessage(ctx, msg, nil)
if err != nil {
if impl.IsNetworkError(err) {
// Retry after reconnecting
a.deleteSenderForTopic(req.Topic)
return err
}
var amqpError *amqp.Error
var expError *servicebus.Error
if errors.As(err, &amqpError) {
if _, ok := retriableSendingErrors[amqpError.Condition]; ok {
return amqpError // Retries.
// Retry (no need to reconnect)
return amqpError
}
}
if errors.Is(err, amqp.ErrConnClosed) {
return err // Retries.
}
if errors.As(err, &expError) {
if expError.Code == "connlost" {
a.logger.Warn(expError.Error())
return expError // Retries.
}
}
return backoff.Permanent(err) // Does not retry.
// Do not retry on other errors
return backoff.Permanent(err)
}
return nil
},
bo,
func(err error, _ time.Duration) {
a.logger.Debugf("Could not publish service bus message (%s). Retrying...: %v", msgID, err)
a.logger.Warnf("Could not publish service bus message (%s). Retrying...: %v", msgID, err)
},
func() {
a.logger.Debugf("Successfully published service bus message (%s) after it previously failed", msgID)
a.logger.Infof("Successfully published service bus message (%s) after it previously failed", msgID)
},
)
}
@ -397,7 +395,7 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
})
if err != nil {
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
if err != context.Canceled {
if errors.Is(err, context.Canceled) {
a.logger.Errorf("%s could not instantiate subscription %s for topic %s", errorMessagePrefix, subID, req.Topic)
}
return
@ -413,15 +411,8 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
bo.Reset()
},
)
if err != nil {
var detachError *amqp.DetachError
var amqpError *amqp.Error
if errors.Is(err, detachError) ||
(errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) {
a.logger.Debug(err)
} else {
a.logger.Error(err)
}
if err != nil && !errors.Is(err, context.Canceled) {
a.logger.Error(err)
}
// Gracefully close the connection (in case it's not closed already)
@ -468,6 +459,15 @@ func (a *azureServiceBus) senderForTopic(ctx context.Context, topic string) (*se
return sender, nil
}
a.topicsLock.Lock()
defer a.topicsLock.Unlock()
// Check again after acquiring a write lock in case another goroutine created the sender
sender, ok = a.topics[topic]
if ok && sender != nil {
return sender, nil
}
// Ensure the topic exists the first time it is referenced.
var err error
if !a.metadata.DisableEntityManagement {
@ -475,8 +475,8 @@ func (a *azureServiceBus) senderForTopic(ctx context.Context, topic string) (*se
return nil, err
}
}
a.topicsLock.Lock()
defer a.topicsLock.Unlock()
// Create the sender
sender, err = a.client.NewSender(topic, nil)
if err != nil {
return nil, err
@ -486,6 +486,20 @@ func (a *azureServiceBus) senderForTopic(ctx context.Context, topic string) (*se
return sender, nil
}
// deleteSenderForTopic deletes a sender for a topic, closing the connection
func (a *azureServiceBus) deleteSenderForTopic(topic string) {
a.topicsLock.Lock()
defer a.topicsLock.Unlock()
sender, ok := a.topics[topic]
if ok && sender != nil {
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second)
_ = sender.Close(closeCtx)
closeCancel()
}
delete(a.topics, topic)
}
func (a *azureServiceBus) ensureTopic(ctx context.Context, topic string) error {
shouldCreate, err := a.shouldCreateTopic(ctx, topic)
if err != nil {