From c27a6580a81aafee4c86a81a9a333d0c637e5ee8 Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Fri, 2 Dec 2022 15:40:48 +0000 Subject: [PATCH] WIP: ASB sessions Signed-off-by: Joni Collinge --- .../servicebusqueues/servicebusqueues.go | 12 +- .../component/azure/servicebus/metadata.go | 3 + .../azure/servicebus/subscription.go | 211 +++++++++++++----- .../azure/servicebus/subscription_test.go | 2 + pubsub/azure/servicebus/queues/servicebus.go | 20 +- pubsub/azure/servicebus/topics/servicebus.go | 146 +++++++++--- 6 files changed, 297 insertions(+), 97 deletions(-) diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index 7922438c3..098e5cfca 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -118,12 +118,15 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi a.metadata.MaxRetriableErrorsPerSec, a.metadata.MaxConcurrentHandlers, "queue "+a.metadata.QueueName, + a.metadata.LockRenewalInSec, + false, a.logger, ) // Blocks until a successful connection (or until context is canceled) - err := sub.Connect(func() (*servicebus.Receiver, error) { - return a.client.GetClient().NewReceiverForQueue(a.metadata.QueueName, nil) + err := sub.Connect(func() (impl.Receiver, error) { + receiver, err := a.client.GetClient().NewReceiverForQueue(a.metadata.QueueName, nil) + return &impl.MessageReceiver{Receiver: receiver}, err }) if err != nil { // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. @@ -137,12 +140,13 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. err = sub.ReceiveAndBlock( a.getHandlerFunc(handler), - a.metadata.LockRenewalInSec, - false, // Bulk is not supported here. func() { // Reset the backoff when the subscription is successful and we have received the first message bo.Reset() }, + impl.ReceiveOptions{ + BulkEnabled: false, // Bulk is not supported here. + }, ) if err != nil && !errors.Is(err, context.Canceled) { a.logger.Error(err) diff --git a/internal/component/azure/servicebus/metadata.go b/internal/component/azure/servicebus/metadata.go index b61df2661..4b94c3bb4 100644 --- a/internal/component/azure/servicebus/metadata.go +++ b/internal/component/azure/servicebus/metadata.go @@ -49,6 +49,9 @@ type Metadata struct { PublishMaxRetries int `json:"publishMaxRetries"` PublishInitialRetryIntervalInMs int `json:"publishInitialRetryInternalInMs"` NamespaceName string `json:"namespaceName"` // Only for Azure AD + SessionsEnabled bool `json:"sessionsEnabled"` + MaxConcurrentSesions *int `json:"maxConcurrentSessions"` + SessionIdleTimeoutInSec *int `json:"sessionIdleTimeoutInSec"` /** For bindings only **/ QueueName string `json:"queueName"` // Only queues diff --git a/internal/component/azure/servicebus/subscription.go b/internal/component/azure/servicebus/subscription.go index 40804dc76..d6dfe2bb2 100644 --- a/internal/component/azure/servicebus/subscription.go +++ b/internal/component/azure/servicebus/subscription.go @@ -37,13 +37,56 @@ type HandlerResponseItem struct { // HandlerFunc is the type for handlers that receive messages type HandlerFunc func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]HandlerResponseItem, error) +type Receiver interface { + ID() string + ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) + CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.CompleteMessageOptions) error + AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.AbandonMessageOptions) error + RenewLocks(ctx context.Context, m *azservicebus.ReceivedMessage) error + Close(ctx context.Context) error +} + +var ( + _ Receiver = (*SessionReceiver)(nil) + _ Receiver = (*MessageReceiver)(nil) +) + +type SessionReceiver struct { + *azservicebus.SessionReceiver +} + +func (s *SessionReceiver) ID() string { + return s.SessionID() +} + +func (s *SessionReceiver) RenewLocks(ctx context.Context, msg *azservicebus.ReceivedMessage) error { + return s.RenewSessionLock(ctx, nil) +} + +type MessageReceiver struct { + *azservicebus.Receiver +} + +func (m *MessageReceiver) ID() string { + return "" +} + +func (m *MessageReceiver) RenewLocks(ctx context.Context, msg *azservicebus.ReceivedMessage) error { + return m.RenewMessageLock(ctx, msg, nil) +} + +type MessageRenewFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error + // Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue. type Subscription struct { entity string mu sync.RWMutex activeMessages map[int64]*azservicebus.ReceivedMessage activeOperationsChan chan struct{} - receiver *azservicebus.Receiver + receiver *MessageReceiver + sessionMu sync.RWMutex + sessionReceivers map[string]*SessionReceiver + sessionsEnabled bool // read-only once set timeout time.Duration maxBulkSubCount int retriableErrLimit ratelimit.Limiter @@ -63,6 +106,8 @@ func NewSubscription( maxRetriableEPS int, maxConcurrentHandlers int, entity string, + lockRenewalInSec int, + sessionsEnabled bool, logger logger.Logger, ) *Subscription { ctx, cancel := context.WithCancel(parentCtx) @@ -83,13 +128,15 @@ func NewSubscription( } s := &Subscription{ - entity: entity, - activeMessages: make(map[int64]*azservicebus.ReceivedMessage), - timeout: time.Duration(timeoutInSec) * time.Second, - maxBulkSubCount: *maxBulkSubCount, - logger: logger, - ctx: ctx, - cancel: cancel, + entity: entity, + activeMessages: make(map[int64]*azservicebus.ReceivedMessage), + timeout: time.Duration(timeoutInSec) * time.Second, + maxBulkSubCount: *maxBulkSubCount, + sessionReceivers: make(map[string]*SessionReceiver), + sessionsEnabled: sessionsEnabled, + logger: logger, + ctx: ctx, + cancel: cancel, // This is a pessimistic estimate of the number of total operations that can be active at any given time. // In case of a non-bulk subscription, one operation is one message. activeOperationsChan: make(chan struct{}, maxActiveMessages/(*maxBulkSubCount)), @@ -106,44 +153,6 @@ func NewSubscription( s.handleChan = make(chan struct{}, maxConcurrentHandlers) } - return s -} - -// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled). -func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, error)) error { - // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. - config := retry.DefaultConfig() - config.Policy = retry.PolicyExponential - config.MaxInterval = 5 * time.Minute - config.MaxElapsedTime = 0 - backoff := config.NewBackOffWithContext(s.ctx) - - err := retry.NotifyRecover( - func() error { - clientAttempt, innerErr := newReceiverFunc() - if innerErr != nil { - return innerErr - } - s.receiver = clientAttempt - return nil - }, - backoff, - func(err error, d time.Duration) { - s.logger.Warnf("Failed to connect to Azure Service Bus %s; will retry in %s. Error: %s", s.entity, d, err.Error()) - }, - func() { - s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity) - }, - ) - - return err -} - -// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue. -func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int, bulkEnabled bool, onFirstSuccess func()) error { - ctx, cancel := context.WithCancel(s.ctx) - defer cancel() - // Lock renewal loop. go func() { shouldRenewLocks := lockRenewalInSec > 0 @@ -164,6 +173,67 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int } }() + return s +} + +// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled). +func (s *Subscription) Connect(newReceiverFunc func() (Receiver, error)) error { + // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. + config := retry.DefaultConfig() + config.Policy = retry.PolicyExponential + config.MaxInterval = 5 * time.Minute + config.MaxElapsedTime = 0 + backoff := config.NewBackOffWithContext(s.ctx) + + err := retry.NotifyRecover( + func() error { + clientAttempt, innerErr := newReceiverFunc() + if innerErr != nil { + return innerErr + } + if s.sessionsEnabled { + sessionReceiver, ok := clientAttempt.(*SessionReceiver) + if !ok { + return fmt.Errorf("expected a session receiver, got %T", clientAttempt) + } + s.sessionMu.Lock() + _, exists := s.sessionReceivers[sessionReceiver.ID()] + if exists { + s.sessionMu.Unlock() + return fmt.Errorf("session receiver %s already exists", sessionReceiver.ID()) + } + s.sessionReceivers[sessionReceiver.ID()] = sessionReceiver + s.sessionMu.Unlock() + } else { + msgReciever, ok := clientAttempt.(*MessageReceiver) + if !ok { + return fmt.Errorf("expected a message receiver, got %T", clientAttempt) + } + s.receiver = msgReciever + } + return nil + }, + backoff, + func(err error, d time.Duration) { + s.logger.Warnf("Failed to connect to Azure Service Bus %s; will retry in %s. Error: %s", s.entity, d, err.Error()) + }, + func() { + s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity) + }, + ) + + return err +} + +type ReceiveOptions struct { + BulkEnabled bool +} + +// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue. +func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, onFirstSuccess func(), opts ReceiveOptions) error { + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + // Receiver loop for { select { @@ -177,8 +247,11 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int return ctx.Err() } + recvCtx, recvCancel := context.WithTimeout(ctx, s.timeout) + defer recvCancel() + // This method blocks until we get a message or the context is canceled - msgs, err := s.receiver.ReceiveMessages(s.ctx, s.maxBulkSubCount, nil) + msgs, err := s.receiver.ReceiveMessages(recvCtx, s.maxBulkSubCount, nil) if err != nil { if err != context.Canceled { s.logger.Errorf("Error reading from %s. %s", s.entity, err.Error()) @@ -278,7 +351,7 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int } } - if bulkEnabled { + if opts.BulkEnabled { s.handleAsync(s.ctx, msgs, bulkRunHandlerFunc) } else { s.handleAsync(s.ctx, msgs, runHandlerFn) @@ -290,6 +363,14 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int func (s *Subscription) Close(closeCtx context.Context) { s.logger.Debugf("Closing subscription to %s", s.entity) + s.sessionMu.Lock() + for _, session := range s.sessionReceivers { + if err := session.Close(closeCtx); err != nil { + s.logger.Warnf("Error closing session receiver: %s", err.Error()) + } + } + s.sessionMu.Unlock() + // Ensure subscription entity is closed. if err := s.receiver.Close(closeCtx); err != nil { s.logger.Warnf("Error closing subscription for %s: %+v", s.entity, err) @@ -355,6 +436,33 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec } func (s *Subscription) tryRenewLocks() { + if s.sessionsEnabled { + s.tryRenewSessionLocks() + } else { + s.tryRenewMessageLocks() + } +} + +func (s *Subscription) tryRenewSessionLocks() { + s.sessionMu.Lock() + defer s.sessionMu.Unlock() + + if len(s.sessionReceivers) == 0 { + return + } + + var ctx, cancel = context.WithTimeout(s.ctx, s.timeout) + defer cancel() + for _, session := range s.sessionReceivers { + session.RenewLocks(ctx, &azservicebus.ReceivedMessage{}) // mesage not used. + } +} + +func (s *Subscription) tryRenewMessageLocks() { + if s.receiver == nil { + return + } + // Snapshot the messages to try to renew locks for. msgs := make([]*azservicebus.ReceivedMessage, 0) s.mu.RLock() @@ -374,7 +482,10 @@ func (s *Subscription) tryRenewLocks() { var cancel context.CancelFunc for _, msg := range msgs { ctx, cancel = context.WithTimeout(context.Background(), s.timeout) - err = s.receiver.RenewMessageLock(ctx, msg, nil) + + // Renew the lock for the message. + err = s.receiver.RenewLocks(ctx, msg) + if err != nil { s.logger.Debugf("Couldn't renew all active message lock(s) for %s, ", s.entity, err) } @@ -398,7 +509,7 @@ func (s *Subscription) AbandonMessage(ctx context.Context, m *azservicebus.Recei s.logger.Debugf("Taking a retriable error token") before := time.Now() _ = s.retriableErrLimit.Take() - s.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before)) + s.logger.Debugf("Resumed after pausing for %v", time.Since(before)) } // CompleteMessage marks a message as complete. diff --git a/internal/component/azure/servicebus/subscription_test.go b/internal/component/azure/servicebus/subscription_test.go index 1e3d5d05f..57d3c83d9 100644 --- a/internal/component/azure/servicebus/subscription_test.go +++ b/internal/component/azure/servicebus/subscription_test.go @@ -63,6 +63,8 @@ func TestNewSubscription(t *testing.T) { 10, 100, "test", + 30, + false, logger.NewLogger("test"), ) if sub.maxBulkSubCount != tc.maxBulkSubCountExpected { diff --git a/pubsub/azure/servicebus/queues/servicebus.go b/pubsub/azure/servicebus/queues/servicebus.go index 7348f04c1..b7715075a 100644 --- a/pubsub/azure/servicebus/queues/servicebus.go +++ b/pubsub/azure/servicebus/queues/servicebus.go @@ -186,15 +186,18 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub a.metadata.MaxRetriableErrorsPerSec, a.metadata.MaxConcurrentHandlers, "queue "+req.Topic, + a.metadata.LockRenewalInSec, + false, a.logger, ) receiveAndBlockFn := func(onFirstSuccess func()) error { return sub.ReceiveAndBlock( impl.GetPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second), - a.metadata.LockRenewalInSec, - false, // Bulk is not supported in regular Subscribe. onFirstSuccess, + impl.ReceiveOptions{ + BulkEnabled: false, // Bulk is not supported in regular Subscribe. + }, ) } @@ -211,15 +214,18 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub a.metadata.MaxRetriableErrorsPerSec, a.metadata.MaxConcurrentHandlers, "queue "+req.Topic, + a.metadata.LockRenewalInSec, + false, a.logger, ) receiveAndBlockFn := func(onFirstSuccess func()) error { return sub.ReceiveAndBlock( impl.GetBulkPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second), - a.metadata.LockRenewalInSec, - true, // Bulk is supported in BulkSubscribe. onFirstSuccess, + impl.ReceiveOptions{ + BulkEnabled: true, // Bulk is supported in BulkSubscribe. + }, ) } @@ -252,9 +258,11 @@ func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context, // Reconnect loop. for { // Blocks until a successful connection (or until context is canceled) - err := sub.Connect(func() (*servicebus.Receiver, error) { - return a.client.GetClient().NewReceiverForQueue(req.Topic, nil) + err := sub.Connect(func() (impl.Receiver, error) { + receiver, err := a.client.GetClient().NewReceiverForQueue(req.Topic, nil) + return &impl.MessageReceiver{Receiver: receiver}, err }) + if err != nil { // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. if errors.Is(err, context.Canceled) { diff --git a/pubsub/azure/servicebus/topics/servicebus.go b/pubsub/azure/servicebus/topics/servicebus.go index e7f64e36b..a035654a2 100644 --- a/pubsub/azure/servicebus/topics/servicebus.go +++ b/pubsub/azure/servicebus/topics/servicebus.go @@ -176,6 +176,7 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli } func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { + sessionsEnabled := true sub := impl.NewSubscription( subscribeCtx, a.metadata.MaxActiveMessages, @@ -184,22 +185,26 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub a.metadata.MaxRetriableErrorsPerSec, a.metadata.MaxConcurrentHandlers, "topic "+req.Topic, + a.metadata.LockRenewalInSec, + sessionsEnabled, a.logger, ) receiveAndBlockFn := func(onFirstSuccess func()) error { return sub.ReceiveAndBlock( impl.GetPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second), - a.metadata.LockRenewalInSec, - false, // Bulk is not supported in regular Subscribe. onFirstSuccess, + impl.ReceiveOptions{ + BulkEnabled: false, // Bulk is not supported in regular Subscribe. + }, ) } - return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn) + return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, sessionsEnabled) } func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.BulkHandler) error { + sessionsEnabled := true maxBulkSubCount := utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkSubCountKey, defaultMaxBulkSubCount) sub := impl.NewSubscription( subscribeCtx, @@ -209,26 +214,28 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub a.metadata.MaxRetriableErrorsPerSec, a.metadata.MaxConcurrentHandlers, "topic "+req.Topic, + a.metadata.LockRenewalInSec, + sessionsEnabled, a.logger, ) receiveAndBlockFn := func(onFirstSuccess func()) error { return sub.ReceiveAndBlock( impl.GetBulkPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second), - a.metadata.LockRenewalInSec, - true, // Bulk is supported in BulkSubscribe. onFirstSuccess, + impl.ReceiveOptions{ + BulkEnabled: true, // Bulk is supported in BulkSubscribe. + }, ) } - return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn) + return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, sessionsEnabled) } // doSubscribe is a helper function that handles the common logic for both Subscribe and BulkSubscribe. // The receiveAndBlockFn is a function should invoke a blocking call to receive messages from the topic. func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context, - req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, -) error { + req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, sessionsEnabled bool) error { // Does nothing if DisableEntityManagement is true err := a.client.EnsureSubscription(subscribeCtx, a.metadata.ConsumerID, req.Topic) if err != nil { @@ -249,35 +256,10 @@ func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context, go func() { // Reconnect loop. for { - // Blocks until a successful connection (or until context is canceled) - err := sub.Connect(func() (*servicebus.Receiver, error) { - return a.client.GetClient().NewReceiverForSubscription(req.Topic, a.metadata.ConsumerID, nil) - }) - if err != nil { - // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. - if errors.Is(err, context.Canceled) { - a.logger.Errorf("Could not instantiate subscription %s for topic %s", a.metadata.ConsumerID, req.Topic) - } - return - } - - // receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns. - // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. - err = receiveAndBlockFn(onFirstSuccess) - if err != nil && !errors.Is(err, context.Canceled) { - a.logger.Error(err) - } - - // Gracefully close the connection (in case it's not closed already) - // Use a background context here (with timeout) because ctx may be closed already - closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) - sub.Close(closeCtx) - closeCancel() - - // If context was canceled, do not attempt to reconnect - if subscribeCtx.Err() != nil { - a.logger.Debug("Context canceled; will not reconnect") - return + if sessionsEnabled { + a.ConnectAndReceiveWithSessions(subscribeCtx, req, sub, receiveAndBlockFn, onFirstSuccess) + } else { + a.ConnectAndReceive(subscribeCtx, req, sub, receiveAndBlockFn, onFirstSuccess) } wait := bo.NextBackOff() @@ -298,3 +280,93 @@ func (a *azureServiceBus) Close() (err error) { func (a *azureServiceBus) Features() []pubsub.Feature { return a.features } + +func (a *azureServiceBus) ConnectAndReceive(subscribeCtx context.Context, + req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, onFirstSuccess func()) { + // Blocks until a successful connection (or until context is canceled) + err := sub.Connect(func() (impl.Receiver, error) { + receiver, err := a.client.GetClient().NewReceiverForSubscription(req.Topic, a.metadata.ConsumerID, nil) + return &impl.MessageReceiver{Receiver: receiver}, err + }) + if err != nil { + // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. + if errors.Is(err, context.Canceled) { + a.logger.Errorf("Could not instantiate subscription %s for topic %s", a.metadata.ConsumerID, req.Topic) + } + return + } + + // receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns. + // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. + err = receiveAndBlockFn(onFirstSuccess) + if err != nil && !errors.Is(err, context.Canceled) { + a.logger.Error(err) + } + + // Gracefully close the connection (in case it's not closed already) + // Use a background context here (with timeout) because ctx may be closed already + closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) + sub.Close(closeCtx) + closeCancel() + + // If context was canceled, do not attempt to reconnect + if subscribeCtx.Err() != nil { + a.logger.Debug("Context canceled; will not reconnect") + return + } +} + +func (a *azureServiceBus) ConnectAndReceiveWithSessions(subscribeCtx context.Context, + req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, onFirstSuccess func()) { + + maxSessions := 8 + sessionsChan := make(chan struct{}, maxSessions) + sessionsCtx, sessionCancel := context.WithTimeout(subscribeCtx, time.Second*time.Duration(a.metadata.TimeoutInSec)) + defer sessionCancel() + + for { + select { + case <-sessionsCtx.Done(): + return + case <-sessionsChan: + go func() { + defer func() { + sessionsChan <- struct{}{} + }() + // Blocks until a successful connection (or until context is canceled) + err := sub.Connect(func() (impl.Receiver, error) { + receiver, err := a.client.GetClient().AcceptNextSessionForSubscription(sessionsCtx, req.Topic, a.metadata.ConsumerID, nil) + return &impl.SessionReceiver{SessionReceiver: receiver}, err + }) + if err != nil { + // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. + if errors.Is(err, context.Canceled) { + a.logger.Errorf("Could not instantiate session subscription %s for topic %s", a.metadata.ConsumerID, req.Topic) + } + return + } + + // receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns. + // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. + err = receiveAndBlockFn(onFirstSuccess) + if err != nil && !errors.Is(err, context.Canceled) { + a.logger.Error(err) + } + + // Gracefully close the connection (in case it's not closed already) + // Use a background context here (with timeout) because ctx may be closed already + closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) + sub.Close(closeCtx) + closeCancel() + + // If context was canceled, do not attempt to reconnect + if subscribeCtx.Err() != nil { + a.logger.Debug("Context canceled; will not reconnect") + return + } + }() + } + + } + +}