WIP: ASB sessions
Signed-off-by: Joni Collinge <jonathancollinge@live.com>
This commit is contained in:
parent
0e0cf8fe58
commit
c27a6580a8
|
@ -118,12 +118,15 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
|
|||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
"queue "+a.metadata.QueueName,
|
||||
a.metadata.LockRenewalInSec,
|
||||
false,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
err := sub.Connect(func() (*servicebus.Receiver, error) {
|
||||
return a.client.GetClient().NewReceiverForQueue(a.metadata.QueueName, nil)
|
||||
err := sub.Connect(func() (impl.Receiver, error) {
|
||||
receiver, err := a.client.GetClient().NewReceiverForQueue(a.metadata.QueueName, nil)
|
||||
return &impl.MessageReceiver{Receiver: receiver}, err
|
||||
})
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
|
@ -137,12 +140,13 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
|
|||
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
|
||||
err = sub.ReceiveAndBlock(
|
||||
a.getHandlerFunc(handler),
|
||||
a.metadata.LockRenewalInSec,
|
||||
false, // Bulk is not supported here.
|
||||
func() {
|
||||
// Reset the backoff when the subscription is successful and we have received the first message
|
||||
bo.Reset()
|
||||
},
|
||||
impl.ReceiveOptions{
|
||||
BulkEnabled: false, // Bulk is not supported here.
|
||||
},
|
||||
)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
a.logger.Error(err)
|
||||
|
|
|
@ -49,6 +49,9 @@ type Metadata struct {
|
|||
PublishMaxRetries int `json:"publishMaxRetries"`
|
||||
PublishInitialRetryIntervalInMs int `json:"publishInitialRetryInternalInMs"`
|
||||
NamespaceName string `json:"namespaceName"` // Only for Azure AD
|
||||
SessionsEnabled bool `json:"sessionsEnabled"`
|
||||
MaxConcurrentSesions *int `json:"maxConcurrentSessions"`
|
||||
SessionIdleTimeoutInSec *int `json:"sessionIdleTimeoutInSec"`
|
||||
|
||||
/** For bindings only **/
|
||||
QueueName string `json:"queueName"` // Only queues
|
||||
|
|
|
@ -37,13 +37,56 @@ type HandlerResponseItem struct {
|
|||
// HandlerFunc is the type for handlers that receive messages
|
||||
type HandlerFunc func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]HandlerResponseItem, error)
|
||||
|
||||
type Receiver interface {
|
||||
ID() string
|
||||
ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
|
||||
CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.CompleteMessageOptions) error
|
||||
AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.AbandonMessageOptions) error
|
||||
RenewLocks(ctx context.Context, m *azservicebus.ReceivedMessage) error
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
||||
var (
|
||||
_ Receiver = (*SessionReceiver)(nil)
|
||||
_ Receiver = (*MessageReceiver)(nil)
|
||||
)
|
||||
|
||||
type SessionReceiver struct {
|
||||
*azservicebus.SessionReceiver
|
||||
}
|
||||
|
||||
func (s *SessionReceiver) ID() string {
|
||||
return s.SessionID()
|
||||
}
|
||||
|
||||
func (s *SessionReceiver) RenewLocks(ctx context.Context, msg *azservicebus.ReceivedMessage) error {
|
||||
return s.RenewSessionLock(ctx, nil)
|
||||
}
|
||||
|
||||
type MessageReceiver struct {
|
||||
*azservicebus.Receiver
|
||||
}
|
||||
|
||||
func (m *MessageReceiver) ID() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *MessageReceiver) RenewLocks(ctx context.Context, msg *azservicebus.ReceivedMessage) error {
|
||||
return m.RenewMessageLock(ctx, msg, nil)
|
||||
}
|
||||
|
||||
type MessageRenewFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error
|
||||
|
||||
// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
|
||||
type Subscription struct {
|
||||
entity string
|
||||
mu sync.RWMutex
|
||||
activeMessages map[int64]*azservicebus.ReceivedMessage
|
||||
activeOperationsChan chan struct{}
|
||||
receiver *azservicebus.Receiver
|
||||
receiver *MessageReceiver
|
||||
sessionMu sync.RWMutex
|
||||
sessionReceivers map[string]*SessionReceiver
|
||||
sessionsEnabled bool // read-only once set
|
||||
timeout time.Duration
|
||||
maxBulkSubCount int
|
||||
retriableErrLimit ratelimit.Limiter
|
||||
|
@ -63,6 +106,8 @@ func NewSubscription(
|
|||
maxRetriableEPS int,
|
||||
maxConcurrentHandlers int,
|
||||
entity string,
|
||||
lockRenewalInSec int,
|
||||
sessionsEnabled bool,
|
||||
logger logger.Logger,
|
||||
) *Subscription {
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
|
@ -83,13 +128,15 @@ func NewSubscription(
|
|||
}
|
||||
|
||||
s := &Subscription{
|
||||
entity: entity,
|
||||
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
|
||||
timeout: time.Duration(timeoutInSec) * time.Second,
|
||||
maxBulkSubCount: *maxBulkSubCount,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
entity: entity,
|
||||
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
|
||||
timeout: time.Duration(timeoutInSec) * time.Second,
|
||||
maxBulkSubCount: *maxBulkSubCount,
|
||||
sessionReceivers: make(map[string]*SessionReceiver),
|
||||
sessionsEnabled: sessionsEnabled,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
// This is a pessimistic estimate of the number of total operations that can be active at any given time.
|
||||
// In case of a non-bulk subscription, one operation is one message.
|
||||
activeOperationsChan: make(chan struct{}, maxActiveMessages/(*maxBulkSubCount)),
|
||||
|
@ -106,44 +153,6 @@ func NewSubscription(
|
|||
s.handleChan = make(chan struct{}, maxConcurrentHandlers)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).
|
||||
func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, error)) error {
|
||||
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
|
||||
config := retry.DefaultConfig()
|
||||
config.Policy = retry.PolicyExponential
|
||||
config.MaxInterval = 5 * time.Minute
|
||||
config.MaxElapsedTime = 0
|
||||
backoff := config.NewBackOffWithContext(s.ctx)
|
||||
|
||||
err := retry.NotifyRecover(
|
||||
func() error {
|
||||
clientAttempt, innerErr := newReceiverFunc()
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
s.receiver = clientAttempt
|
||||
return nil
|
||||
},
|
||||
backoff,
|
||||
func(err error, d time.Duration) {
|
||||
s.logger.Warnf("Failed to connect to Azure Service Bus %s; will retry in %s. Error: %s", s.entity, d, err.Error())
|
||||
},
|
||||
func() {
|
||||
s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity)
|
||||
},
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.
|
||||
func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int, bulkEnabled bool, onFirstSuccess func()) error {
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
defer cancel()
|
||||
|
||||
// Lock renewal loop.
|
||||
go func() {
|
||||
shouldRenewLocks := lockRenewalInSec > 0
|
||||
|
@ -164,6 +173,67 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
|
|||
}
|
||||
}()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).
|
||||
func (s *Subscription) Connect(newReceiverFunc func() (Receiver, error)) error {
|
||||
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
|
||||
config := retry.DefaultConfig()
|
||||
config.Policy = retry.PolicyExponential
|
||||
config.MaxInterval = 5 * time.Minute
|
||||
config.MaxElapsedTime = 0
|
||||
backoff := config.NewBackOffWithContext(s.ctx)
|
||||
|
||||
err := retry.NotifyRecover(
|
||||
func() error {
|
||||
clientAttempt, innerErr := newReceiverFunc()
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
if s.sessionsEnabled {
|
||||
sessionReceiver, ok := clientAttempt.(*SessionReceiver)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected a session receiver, got %T", clientAttempt)
|
||||
}
|
||||
s.sessionMu.Lock()
|
||||
_, exists := s.sessionReceivers[sessionReceiver.ID()]
|
||||
if exists {
|
||||
s.sessionMu.Unlock()
|
||||
return fmt.Errorf("session receiver %s already exists", sessionReceiver.ID())
|
||||
}
|
||||
s.sessionReceivers[sessionReceiver.ID()] = sessionReceiver
|
||||
s.sessionMu.Unlock()
|
||||
} else {
|
||||
msgReciever, ok := clientAttempt.(*MessageReceiver)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected a message receiver, got %T", clientAttempt)
|
||||
}
|
||||
s.receiver = msgReciever
|
||||
}
|
||||
return nil
|
||||
},
|
||||
backoff,
|
||||
func(err error, d time.Duration) {
|
||||
s.logger.Warnf("Failed to connect to Azure Service Bus %s; will retry in %s. Error: %s", s.entity, d, err.Error())
|
||||
},
|
||||
func() {
|
||||
s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity)
|
||||
},
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type ReceiveOptions struct {
|
||||
BulkEnabled bool
|
||||
}
|
||||
|
||||
// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.
|
||||
func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, onFirstSuccess func(), opts ReceiveOptions) error {
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
defer cancel()
|
||||
|
||||
// Receiver loop
|
||||
for {
|
||||
select {
|
||||
|
@ -177,8 +247,11 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
recvCtx, recvCancel := context.WithTimeout(ctx, s.timeout)
|
||||
defer recvCancel()
|
||||
|
||||
// This method blocks until we get a message or the context is canceled
|
||||
msgs, err := s.receiver.ReceiveMessages(s.ctx, s.maxBulkSubCount, nil)
|
||||
msgs, err := s.receiver.ReceiveMessages(recvCtx, s.maxBulkSubCount, nil)
|
||||
if err != nil {
|
||||
if err != context.Canceled {
|
||||
s.logger.Errorf("Error reading from %s. %s", s.entity, err.Error())
|
||||
|
@ -278,7 +351,7 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
|
|||
}
|
||||
}
|
||||
|
||||
if bulkEnabled {
|
||||
if opts.BulkEnabled {
|
||||
s.handleAsync(s.ctx, msgs, bulkRunHandlerFunc)
|
||||
} else {
|
||||
s.handleAsync(s.ctx, msgs, runHandlerFn)
|
||||
|
@ -290,6 +363,14 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
|
|||
func (s *Subscription) Close(closeCtx context.Context) {
|
||||
s.logger.Debugf("Closing subscription to %s", s.entity)
|
||||
|
||||
s.sessionMu.Lock()
|
||||
for _, session := range s.sessionReceivers {
|
||||
if err := session.Close(closeCtx); err != nil {
|
||||
s.logger.Warnf("Error closing session receiver: %s", err.Error())
|
||||
}
|
||||
}
|
||||
s.sessionMu.Unlock()
|
||||
|
||||
// Ensure subscription entity is closed.
|
||||
if err := s.receiver.Close(closeCtx); err != nil {
|
||||
s.logger.Warnf("Error closing subscription for %s: %+v", s.entity, err)
|
||||
|
@ -355,6 +436,33 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec
|
|||
}
|
||||
|
||||
func (s *Subscription) tryRenewLocks() {
|
||||
if s.sessionsEnabled {
|
||||
s.tryRenewSessionLocks()
|
||||
} else {
|
||||
s.tryRenewMessageLocks()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscription) tryRenewSessionLocks() {
|
||||
s.sessionMu.Lock()
|
||||
defer s.sessionMu.Unlock()
|
||||
|
||||
if len(s.sessionReceivers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var ctx, cancel = context.WithTimeout(s.ctx, s.timeout)
|
||||
defer cancel()
|
||||
for _, session := range s.sessionReceivers {
|
||||
session.RenewLocks(ctx, &azservicebus.ReceivedMessage{}) // mesage not used.
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscription) tryRenewMessageLocks() {
|
||||
if s.receiver == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Snapshot the messages to try to renew locks for.
|
||||
msgs := make([]*azservicebus.ReceivedMessage, 0)
|
||||
s.mu.RLock()
|
||||
|
@ -374,7 +482,10 @@ func (s *Subscription) tryRenewLocks() {
|
|||
var cancel context.CancelFunc
|
||||
for _, msg := range msgs {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), s.timeout)
|
||||
err = s.receiver.RenewMessageLock(ctx, msg, nil)
|
||||
|
||||
// Renew the lock for the message.
|
||||
err = s.receiver.RenewLocks(ctx, msg)
|
||||
|
||||
if err != nil {
|
||||
s.logger.Debugf("Couldn't renew all active message lock(s) for %s, ", s.entity, err)
|
||||
}
|
||||
|
@ -398,7 +509,7 @@ func (s *Subscription) AbandonMessage(ctx context.Context, m *azservicebus.Recei
|
|||
s.logger.Debugf("Taking a retriable error token")
|
||||
before := time.Now()
|
||||
_ = s.retriableErrLimit.Take()
|
||||
s.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before))
|
||||
s.logger.Debugf("Resumed after pausing for %v", time.Since(before))
|
||||
}
|
||||
|
||||
// CompleteMessage marks a message as complete.
|
||||
|
|
|
@ -63,6 +63,8 @@ func TestNewSubscription(t *testing.T) {
|
|||
10,
|
||||
100,
|
||||
"test",
|
||||
30,
|
||||
false,
|
||||
logger.NewLogger("test"),
|
||||
)
|
||||
if sub.maxBulkSubCount != tc.maxBulkSubCountExpected {
|
||||
|
|
|
@ -186,15 +186,18 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
|
|||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
"queue "+req.Topic,
|
||||
a.metadata.LockRenewalInSec,
|
||||
false,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
receiveAndBlockFn := func(onFirstSuccess func()) error {
|
||||
return sub.ReceiveAndBlock(
|
||||
impl.GetPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second),
|
||||
a.metadata.LockRenewalInSec,
|
||||
false, // Bulk is not supported in regular Subscribe.
|
||||
onFirstSuccess,
|
||||
impl.ReceiveOptions{
|
||||
BulkEnabled: false, // Bulk is not supported in regular Subscribe.
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -211,15 +214,18 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub
|
|||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
"queue "+req.Topic,
|
||||
a.metadata.LockRenewalInSec,
|
||||
false,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
receiveAndBlockFn := func(onFirstSuccess func()) error {
|
||||
return sub.ReceiveAndBlock(
|
||||
impl.GetBulkPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second),
|
||||
a.metadata.LockRenewalInSec,
|
||||
true, // Bulk is supported in BulkSubscribe.
|
||||
onFirstSuccess,
|
||||
impl.ReceiveOptions{
|
||||
BulkEnabled: true, // Bulk is supported in BulkSubscribe.
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -252,9 +258,11 @@ func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context,
|
|||
// Reconnect loop.
|
||||
for {
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
err := sub.Connect(func() (*servicebus.Receiver, error) {
|
||||
return a.client.GetClient().NewReceiverForQueue(req.Topic, nil)
|
||||
err := sub.Connect(func() (impl.Receiver, error) {
|
||||
receiver, err := a.client.GetClient().NewReceiverForQueue(req.Topic, nil)
|
||||
return &impl.MessageReceiver{Receiver: receiver}, err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
|
|
|
@ -176,6 +176,7 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
|
|||
}
|
||||
|
||||
func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
sessionsEnabled := true
|
||||
sub := impl.NewSubscription(
|
||||
subscribeCtx,
|
||||
a.metadata.MaxActiveMessages,
|
||||
|
@ -184,22 +185,26 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
|
|||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
"topic "+req.Topic,
|
||||
a.metadata.LockRenewalInSec,
|
||||
sessionsEnabled,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
receiveAndBlockFn := func(onFirstSuccess func()) error {
|
||||
return sub.ReceiveAndBlock(
|
||||
impl.GetPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second),
|
||||
a.metadata.LockRenewalInSec,
|
||||
false, // Bulk is not supported in regular Subscribe.
|
||||
onFirstSuccess,
|
||||
impl.ReceiveOptions{
|
||||
BulkEnabled: false, // Bulk is not supported in regular Subscribe.
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn)
|
||||
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, sessionsEnabled)
|
||||
}
|
||||
|
||||
func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.BulkHandler) error {
|
||||
sessionsEnabled := true
|
||||
maxBulkSubCount := utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkSubCountKey, defaultMaxBulkSubCount)
|
||||
sub := impl.NewSubscription(
|
||||
subscribeCtx,
|
||||
|
@ -209,26 +214,28 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub
|
|||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
"topic "+req.Topic,
|
||||
a.metadata.LockRenewalInSec,
|
||||
sessionsEnabled,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
receiveAndBlockFn := func(onFirstSuccess func()) error {
|
||||
return sub.ReceiveAndBlock(
|
||||
impl.GetBulkPubSubHandlerFunc(req.Topic, handler, a.logger, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second),
|
||||
a.metadata.LockRenewalInSec,
|
||||
true, // Bulk is supported in BulkSubscribe.
|
||||
onFirstSuccess,
|
||||
impl.ReceiveOptions{
|
||||
BulkEnabled: true, // Bulk is supported in BulkSubscribe.
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn)
|
||||
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, sessionsEnabled)
|
||||
}
|
||||
|
||||
// doSubscribe is a helper function that handles the common logic for both Subscribe and BulkSubscribe.
|
||||
// The receiveAndBlockFn is a function should invoke a blocking call to receive messages from the topic.
|
||||
func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context,
|
||||
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error,
|
||||
) error {
|
||||
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, sessionsEnabled bool) error {
|
||||
// Does nothing if DisableEntityManagement is true
|
||||
err := a.client.EnsureSubscription(subscribeCtx, a.metadata.ConsumerID, req.Topic)
|
||||
if err != nil {
|
||||
|
@ -249,35 +256,10 @@ func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context,
|
|||
go func() {
|
||||
// Reconnect loop.
|
||||
for {
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
err := sub.Connect(func() (*servicebus.Receiver, error) {
|
||||
return a.client.GetClient().NewReceiverForSubscription(req.Topic, a.metadata.ConsumerID, nil)
|
||||
})
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
a.logger.Errorf("Could not instantiate subscription %s for topic %s", a.metadata.ConsumerID, req.Topic)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns.
|
||||
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
|
||||
err = receiveAndBlockFn(onFirstSuccess)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
a.logger.Error(err)
|
||||
}
|
||||
|
||||
// Gracefully close the connection (in case it's not closed already)
|
||||
// Use a background context here (with timeout) because ctx may be closed already
|
||||
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec))
|
||||
sub.Close(closeCtx)
|
||||
closeCancel()
|
||||
|
||||
// If context was canceled, do not attempt to reconnect
|
||||
if subscribeCtx.Err() != nil {
|
||||
a.logger.Debug("Context canceled; will not reconnect")
|
||||
return
|
||||
if sessionsEnabled {
|
||||
a.ConnectAndReceiveWithSessions(subscribeCtx, req, sub, receiveAndBlockFn, onFirstSuccess)
|
||||
} else {
|
||||
a.ConnectAndReceive(subscribeCtx, req, sub, receiveAndBlockFn, onFirstSuccess)
|
||||
}
|
||||
|
||||
wait := bo.NextBackOff()
|
||||
|
@ -298,3 +280,93 @@ func (a *azureServiceBus) Close() (err error) {
|
|||
func (a *azureServiceBus) Features() []pubsub.Feature {
|
||||
return a.features
|
||||
}
|
||||
|
||||
func (a *azureServiceBus) ConnectAndReceive(subscribeCtx context.Context,
|
||||
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, onFirstSuccess func()) {
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
err := sub.Connect(func() (impl.Receiver, error) {
|
||||
receiver, err := a.client.GetClient().NewReceiverForSubscription(req.Topic, a.metadata.ConsumerID, nil)
|
||||
return &impl.MessageReceiver{Receiver: receiver}, err
|
||||
})
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
a.logger.Errorf("Could not instantiate subscription %s for topic %s", a.metadata.ConsumerID, req.Topic)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns.
|
||||
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
|
||||
err = receiveAndBlockFn(onFirstSuccess)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
a.logger.Error(err)
|
||||
}
|
||||
|
||||
// Gracefully close the connection (in case it's not closed already)
|
||||
// Use a background context here (with timeout) because ctx may be closed already
|
||||
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec))
|
||||
sub.Close(closeCtx)
|
||||
closeCancel()
|
||||
|
||||
// If context was canceled, do not attempt to reconnect
|
||||
if subscribeCtx.Err() != nil {
|
||||
a.logger.Debug("Context canceled; will not reconnect")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (a *azureServiceBus) ConnectAndReceiveWithSessions(subscribeCtx context.Context,
|
||||
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error, onFirstSuccess func()) {
|
||||
|
||||
maxSessions := 8
|
||||
sessionsChan := make(chan struct{}, maxSessions)
|
||||
sessionsCtx, sessionCancel := context.WithTimeout(subscribeCtx, time.Second*time.Duration(a.metadata.TimeoutInSec))
|
||||
defer sessionCancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sessionsCtx.Done():
|
||||
return
|
||||
case <-sessionsChan:
|
||||
go func() {
|
||||
defer func() {
|
||||
sessionsChan <- struct{}{}
|
||||
}()
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
err := sub.Connect(func() (impl.Receiver, error) {
|
||||
receiver, err := a.client.GetClient().AcceptNextSessionForSubscription(sessionsCtx, req.Topic, a.metadata.ConsumerID, nil)
|
||||
return &impl.SessionReceiver{SessionReceiver: receiver}, err
|
||||
})
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
a.logger.Errorf("Could not instantiate session subscription %s for topic %s", a.metadata.ConsumerID, req.Topic)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns.
|
||||
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
|
||||
err = receiveAndBlockFn(onFirstSuccess)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
a.logger.Error(err)
|
||||
}
|
||||
|
||||
// Gracefully close the connection (in case it's not closed already)
|
||||
// Use a background context here (with timeout) because ctx may be closed already
|
||||
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec))
|
||||
sub.Close(closeCtx)
|
||||
closeCancel()
|
||||
|
||||
// If context was canceled, do not attempt to reconnect
|
||||
if subscribeCtx.Err() != nil {
|
||||
a.logger.Debug("Context canceled; will not reconnect")
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue