From b9f33ca14c2271f1c5679ee48df098d60cf49055 Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Tue, 14 Feb 2023 19:33:13 +0000 Subject: [PATCH] [release-1.10] Improve lock handling and renewal for ASB messages (fix #2532) (#2533) * Improved lock renewal Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Better memory management Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Complete and abandon messages in parallel too Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * 10->20 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --------- Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- internal/component/azure/servicebus/errors.go | 18 ++- .../component/azure/servicebus/receiver.go | 46 ------ .../azure/servicebus/subscription.go | 137 ++++++++++++++---- 3 files changed, 125 insertions(+), 76 deletions(-) diff --git a/internal/component/azure/servicebus/errors.go b/internal/component/azure/servicebus/errors.go index 9a3007488..17549bba9 100644 --- a/internal/component/azure/servicebus/errors.go +++ b/internal/component/azure/servicebus/errors.go @@ -30,7 +30,7 @@ func IsNetworkError(err error) bool { var expError *azservicebus.Error if errors.As(err, &expError) { - if expError.Code == "connlost" { + if expError.Code == azservicebus.CodeConnectionLost { return true } } @@ -56,3 +56,19 @@ func IsRetriableAMQPError(err error) bool { } return false } + +// IsLockLostError returns true if the error is "locklost". +func IsLockLostError(err error) bool { + if err == nil { + return false + } + + var expError *azservicebus.Error + if errors.As(err, &expError) { + if expError.Code == azservicebus.CodeLockLost { + return true + } + } + + return false +} diff --git a/internal/component/azure/servicebus/receiver.go b/internal/component/azure/servicebus/receiver.go index d911e5f2a..61485e21e 100644 --- a/internal/component/azure/servicebus/receiver.go +++ b/internal/component/azure/servicebus/receiver.go @@ -15,12 +15,9 @@ package servicebus import ( "context" - "fmt" - "sync" "time" azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - "go.uber.org/multierr" ) type Receiver interface { @@ -61,46 +58,3 @@ func NewMessageReceiver(r *azservicebus.Receiver) *MessageReceiver { type MessageReceiver struct { *azservicebus.Receiver } - -func (m *MessageReceiver) RenewMessageLocks(ctx context.Context, msgs []*azservicebus.ReceivedMessage, timeout time.Duration) error { - if m == nil { - return nil - } - - var wg sync.WaitGroup - - errChan := make(chan error, len(msgs)) - for _, msg := range msgs { - wg.Add(1) - - go func(rmsg *azservicebus.ReceivedMessage) { - defer wg.Done() - - lockCtx, lockCancel := context.WithTimeout(ctx, timeout) - defer lockCancel() - - // Renew the lock for the message. - err := m.RenewMessageLock(lockCtx, rmsg, nil) - if err != nil { - errChan <- fmt.Errorf("couldn't renew active message lock for message %s, %w", rmsg.MessageID, err) - } - }(msg) - } - - wg.Wait() - close(errChan) - - errs := []error{} - for err := range errChan { - if err == nil { - continue - } - errs = append(errs, err) - } - - if len(errs) > 0 { - return multierr.Combine(errs...) - } - - return nil -} diff --git a/internal/component/azure/servicebus/subscription.go b/internal/component/azure/servicebus/subscription.go index b0951a753..730a83da2 100644 --- a/internal/component/azure/servicebus/subscription.go +++ b/internal/component/azure/servicebus/subscription.go @@ -21,6 +21,7 @@ import ( "time" azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "go.uber.org/multierr" "go.uber.org/ratelimit" "github.com/dapr/kit/logger" @@ -35,6 +36,9 @@ const ( DefaultSesssionIdleTimeoutInSec = 60 DefaultMaxConcurrentSessions = 8 + + // Maximum number of concurrent operations such as lock renewals or message completion/abandonment + maxConcurrentOps = 20 ) // HandlerResponseItem represents a response from the handler for each message. @@ -299,7 +303,9 @@ func (s *Subscription) renewLocksBlocking(ctx context.Context, receiver Receiver } } -func (s *Subscription) doRenewLocks(ctx context.Context, msgReceiver *MessageReceiver) { +func (s *Subscription) doRenewLocks(ctx context.Context, receiver *MessageReceiver) { + s.logger.Debugf("Renewing message locks for %s", s.entity) + // Snapshot the messages to try to renew locks for. s.mu.RLock() msgs := make([]*azservicebus.ReceivedMessage, len(s.activeMessages)) @@ -315,12 +321,49 @@ func (s *Subscription) doRenewLocks(ctx context.Context, msgReceiver *MessageRec return } - err := msgReceiver.RenewMessageLocks(ctx, msgs, s.timeout) + // Renew the locks for each message, with a limit of maxConcurrentOps in parallel + limitCh := make(chan struct{}, maxConcurrentOps) + errChan := make(chan error) + for _, msg := range msgs { + // Limit parllel executions + limitCh <- struct{}{} + go func(rMsg *azservicebus.ReceivedMessage) { + defer func() { + <-limitCh + }() + // Check again if the message is active, in case it was already completed in the meanwhile + s.mu.RLock() + _, ok := s.activeMessages[*rMsg.SequenceNumber] + s.mu.RUnlock() + if !ok { + return + } + + // Renew the lock for the message. + lockCtx, lockCancel := context.WithTimeout(ctx, s.timeout) + defer lockCancel() + rErr := receiver.RenewMessageLock(lockCtx, rMsg, nil) + switch { + case IsLockLostError(rErr): + errChan <- errors.New("couldn't renew active message lock for message " + rMsg.MessageID + ": lock has been lost (this often happens if the message has already been completed or abandoned)") + case rErr != nil: + errChan <- fmt.Errorf("couldn't renew active message lock for message %s: %w", rMsg.MessageID, rErr) + default: + errChan <- nil + } + }(msg) + } + + var err error + for i := 0; i < len(msgs); i++ { + multierr.AppendInto(&err, <-errChan) + } + if err != nil { s.logger.Warnf("Error renewing message locks for %s: %v", s.entity, err) - } else { - s.logger.Debugf("Renewed message locks for %s", s.entity) + return } + s.logger.Debugf("Renewed message locks for %s", s.entity) } func (s *Subscription) doRenewLocksSession(ctx context.Context, sessionReceiver *SessionReceiver) { @@ -347,8 +390,11 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec s.logger.Debugf("Released message handle for %s on %s", msgs[0].MessageID, s.entity) } + // Remove the messages from the map of active ones + s.removeActiveMessages(msgs) + // If we got a retriable error (app handler returned a retriable error, or a network error while connecting to the app, etc) consume a retriable error token - // We do it here, after the handler has been released but before removing the active message (which would allow us to retrieve more messages) + // We do it here, after the handler has been released but before releasing the active operation (which would allow us to retrieve more messages) if consumeToken { if s.logger.IsOutputLevelEnabled(logger.DebugLevel) { s.logger.Debugf("Taking a retriable error token") @@ -360,11 +406,6 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec } } - for _, msg := range msgs { - // Remove the message from the map of active ones - s.removeActiveMessage(msg.MessageID, *msg.SequenceNumber) - } - // Remove an entry from activeOperationsChan to allow processing more messages <-s.activeOperationsChan }() @@ -403,28 +444,63 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec // Handle the errors on bulk messages and mark messages accordingly. // Note, the order of the responses match the order of the messages. - for i, resp := range resps { - // This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message. - // If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery). - // This uses a background context in case ctx has been canceled already. - finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout) - if resp.Error != nil { - // Log the error only, as we're running asynchronously. - s.logger.Errorf("App handler returned an error for message %s on %s: %s", msgs[i].MessageID, s.entity, resp.Error) - s.AbandonMessage(finalizeCtx, receiver, msgs[i]) - } else { - s.CompleteMessage(finalizeCtx, receiver, msgs[i]) - } - finalizeCancel() + // Perform the operations in a background goroutine with a limit of maxConcurrentOps concurrent + limitCh := make(chan struct{}, maxConcurrentOps) + wg := sync.WaitGroup{} + for i := range resps { + // Limit parllel executions + limitCh <- struct{}{} + wg.Add(1) + + go func(i int) { + defer func() { + wg.Done() + <-limitCh + }() + + // This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message. + // If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery). + // This uses a background context in case ctx has been canceled already. + finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout) + if resps[i].Error != nil { + // Log the error only, as we're running asynchronously. + s.logger.Errorf("App handler returned an error for message %s on %s: %s", msgs[i].MessageID, s.entity, resps[i].Error) + s.AbandonMessage(finalizeCtx, receiver, msgs[i]) + } else { + s.CompleteMessage(finalizeCtx, receiver, msgs[i]) + } + finalizeCancel() + }(i) } return } // No error, so we can complete all messages. - for _, msg := range msgs { + if len(msgs) == 1 { + // Avoid spawning goroutines for 1 message finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout) - s.CompleteMessage(finalizeCtx, receiver, msg) + s.CompleteMessage(finalizeCtx, receiver, msgs[0]) finalizeCancel() + } else { + // Perform the operations in a background goroutine with a limit of maxConcurrentOps concurrent + limitCh := make(chan struct{}, maxConcurrentOps) + wg := sync.WaitGroup{} + for _, msg := range msgs { + // Limit parllel executions + limitCh <- struct{}{} + wg.Add(1) + + go func(msg *azservicebus.ReceivedMessage) { + defer func() { + wg.Done() + <-limitCh + }() + + finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout) + s.CompleteMessage(finalizeCtx, receiver, msg) + finalizeCancel() + }(msg) + } } } @@ -471,9 +547,12 @@ func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) error { return nil } -func (s *Subscription) removeActiveMessage(messageID string, messageKey int64) { - s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", messageID, messageKey, s.entity) +func (s *Subscription) removeActiveMessages(msgs []*azservicebus.ReceivedMessage) { s.mu.Lock() - delete(s.activeMessages, messageKey) - s.mu.Unlock() + defer s.mu.Unlock() + + for _, msg := range msgs { + s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", msg.MessageID, *msg.SequenceNumber, s.entity) + delete(s.activeMessages, *msg.SequenceNumber) + } }