diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index 82861cb24..b40f55130 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -16,13 +16,13 @@ package servicebusqueues import ( "context" "errors" + "fmt" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" - "github.com/Azure/go-amqp" backoff "github.com/cenkalti/backoff/v4" "github.com/dapr/components-contrib/bindings" @@ -141,22 +141,10 @@ func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind { return []bindings.OperationKind{bindings.CreateOperation} } -func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { - var err error - - a.senderLock.RLock() - sender := a.sender - a.senderLock.RUnlock() - - if sender == nil { - a.senderLock.Lock() - sender, err = a.client.NewSender(a.metadata.QueueName, nil) - if err != nil { - a.senderLock.Unlock() - return nil, err - } - a.sender = sender - a.senderLock.Unlock() +func (a *AzureServiceBusQueues) Invoke(invokeCtx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + sender, err := a.getSender() + if err != nil { + return nil, fmt.Errorf("failed to create a sender for the Service Bus queue: %w", err) } msg := &servicebus.Message{ @@ -187,10 +175,19 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke msg.TimeToLive = &ttl } - ctx, cancel := context.WithTimeout(ctx, a.timeout) + // Send the message + ctx, cancel := context.WithTimeout(invokeCtx, a.timeout) defer cancel() + err = sender.SendMessage(ctx, msg, nil) + if err != nil { + if impl.IsNetworkError(err) { + // Force reconnection on next call + a.deleteSender() + } + return nil, err + } - return nil, sender.SendMessage(ctx, msg, nil) + return nil, nil } func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindings.Handler) error { @@ -219,7 +216,7 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi }) if err != nil { // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. - if err != context.Canceled { + if errors.Is(err, context.Canceled) { a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error()) } return @@ -235,15 +232,8 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi bo.Reset() }, ) - if err != nil { - var detachError *amqp.DetachError - var amqpError *amqp.Error - if errors.Is(err, detachError) || - (errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) { - a.logger.Debug(err) - } else { - a.logger.Error(err) - } + if err != nil && !errors.Is(err, context.Canceled) { + a.logger.Error(err) } // Gracefully close the connection (in case it's not closed already) @@ -267,6 +257,46 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi return nil } +// getSender returns the Sender object, creating a new connection if needed +func (a *AzureServiceBusQueues) getSender() (*servicebus.Sender, error) { + // Check if the sender already exists + a.senderLock.RLock() + if a.sender != nil { + a.senderLock.RUnlock() + return a.sender, nil + } + a.senderLock.RUnlock() + + // Acquire a write lock then try checking a.sender again in case another goroutine modified that in the meanwhile + a.senderLock.Lock() + defer a.senderLock.Unlock() + + if a.sender != nil { + return a.sender, nil + } + + // Create a new sender + sender, err := a.client.NewSender(a.metadata.QueueName, nil) + if err != nil { + return nil, err + } + a.sender = sender + + return sender, nil +} + +// deleteSender deletes the sender, closing the connection +func (a *AzureServiceBusQueues) deleteSender() { + a.senderLock.Lock() + if a.sender != nil { + closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second) + _ = a.sender.Close(closeCtx) + closeCancel() + a.sender = nil + } + a.senderLock.Unlock() +} + func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.HandlerFunc { return func(ctx context.Context, msg *servicebus.ReceivedMessage) error { metadata := make(map[string]string) diff --git a/internal/component/azure/servicebus/errors.go b/internal/component/azure/servicebus/errors.go new file mode 100644 index 000000000..8189f0c7b --- /dev/null +++ b/internal/component/azure/servicebus/errors.go @@ -0,0 +1,30 @@ +package servicebus + +import ( + "context" + "errors" + + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/go-amqp" +) + +// IsNetworkError returns true if the error returned by Service Bus is a network-level one, which would require reconnecting. +func IsNetworkError(err error) bool { + if err == nil { + return false + } + + var expError *azservicebus.Error + if errors.As(err, &expError) { + if expError.Code == "connlost" { + return true + } + } + + // Context deadline exceeded errors often happen when the connection is just "hanging" + if errors.Is(err, amqp.ErrConnClosed) || errors.Is(err, context.DeadlineExceeded) { + return true + } + + return false +} diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 1b5beaee0..ec94040a7 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -303,11 +303,6 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) (err error) { } func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { - sender, err := a.senderForTopic(a.publishCtx, req.Topic) - if err != nil { - return err - } - // a.logger.Debugf("Creating message with body: %s", string(req.Data)) msg, err := NewASBMessageFromPubsubRequest(req) if err != nil { @@ -325,40 +320,43 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { } return retry.NotifyRecover( func() (err error) { + // Get the sender + var sender *servicebus.Sender + sender, err = a.senderForTopic(a.publishCtx, req.Topic) + if err != nil { + return err + } + + // Try sending the message ctx, cancel := context.WithTimeout(a.publishCtx, time.Second*time.Duration(a.metadata.TimeoutInSec)) defer cancel() - err = sender.SendMessage(ctx, msg, nil) if err != nil { + if impl.IsNetworkError(err) { + // Retry after reconnecting + a.deleteSenderForTopic(req.Topic) + return err + } + var amqpError *amqp.Error - var expError *servicebus.Error if errors.As(err, &amqpError) { if _, ok := retriableSendingErrors[amqpError.Condition]; ok { - return amqpError // Retries. + // Retry (no need to reconnect) + return amqpError } } - if errors.Is(err, amqp.ErrConnClosed) { - return err // Retries. - } - - if errors.As(err, &expError) { - if expError.Code == "connlost" { - a.logger.Warn(expError.Error()) - return expError // Retries. - } - } - - return backoff.Permanent(err) // Does not retry. + // Do not retry on other errors + return backoff.Permanent(err) } return nil }, bo, func(err error, _ time.Duration) { - a.logger.Debugf("Could not publish service bus message (%s). Retrying...: %v", msgID, err) + a.logger.Warnf("Could not publish service bus message (%s). Retrying...: %v", msgID, err) }, func() { - a.logger.Debugf("Successfully published service bus message (%s) after it previously failed", msgID) + a.logger.Infof("Successfully published service bus message (%s) after it previously failed", msgID) }, ) } @@ -397,7 +395,7 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub }) if err != nil { // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. - if err != context.Canceled { + if errors.Is(err, context.Canceled) { a.logger.Errorf("%s could not instantiate subscription %s for topic %s", errorMessagePrefix, subID, req.Topic) } return @@ -413,15 +411,8 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub bo.Reset() }, ) - if err != nil { - var detachError *amqp.DetachError - var amqpError *amqp.Error - if errors.Is(err, detachError) || - (errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) { - a.logger.Debug(err) - } else { - a.logger.Error(err) - } + if err != nil && !errors.Is(err, context.Canceled) { + a.logger.Error(err) } // Gracefully close the connection (in case it's not closed already) @@ -468,6 +459,15 @@ func (a *azureServiceBus) senderForTopic(ctx context.Context, topic string) (*se return sender, nil } + a.topicsLock.Lock() + defer a.topicsLock.Unlock() + + // Check again after acquiring a write lock in case another goroutine created the sender + sender, ok = a.topics[topic] + if ok && sender != nil { + return sender, nil + } + // Ensure the topic exists the first time it is referenced. var err error if !a.metadata.DisableEntityManagement { @@ -475,8 +475,8 @@ func (a *azureServiceBus) senderForTopic(ctx context.Context, topic string) (*se return nil, err } } - a.topicsLock.Lock() - defer a.topicsLock.Unlock() + + // Create the sender sender, err = a.client.NewSender(topic, nil) if err != nil { return nil, err @@ -486,6 +486,20 @@ func (a *azureServiceBus) senderForTopic(ctx context.Context, topic string) (*se return sender, nil } +// deleteSenderForTopic deletes a sender for a topic, closing the connection +func (a *azureServiceBus) deleteSenderForTopic(topic string) { + a.topicsLock.Lock() + defer a.topicsLock.Unlock() + + sender, ok := a.topics[topic] + if ok && sender != nil { + closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second) + _ = sender.Close(closeCtx) + closeCancel() + } + delete(a.topics, topic) +} + func (a *azureServiceBus) ensureTopic(ctx context.Context, topic string) error { shouldCreate, err := a.shouldCreateTopic(ctx, topic) if err != nil {