[release-1.10] Improve lock handling and renewal for ASB messages (fix #2532) (#2533)

* Improved lock renewal

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Better memory management

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Complete and abandon messages in parallel too

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* 10->20

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
Alessandro (Ale) Segala 2023-02-14 19:33:13 +00:00 committed by GitHub
parent 5bc7498736
commit b9f33ca14c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 125 additions and 76 deletions

View File

@ -30,7 +30,7 @@ func IsNetworkError(err error) bool {
var expError *azservicebus.Error
if errors.As(err, &expError) {
if expError.Code == "connlost" {
if expError.Code == azservicebus.CodeConnectionLost {
return true
}
}
@ -56,3 +56,19 @@ func IsRetriableAMQPError(err error) bool {
}
return false
}
// IsLockLostError returns true if the error is "locklost".
func IsLockLostError(err error) bool {
if err == nil {
return false
}
var expError *azservicebus.Error
if errors.As(err, &expError) {
if expError.Code == azservicebus.CodeLockLost {
return true
}
}
return false
}

View File

@ -15,12 +15,9 @@ package servicebus
import (
"context"
"fmt"
"sync"
"time"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"go.uber.org/multierr"
)
type Receiver interface {
@ -61,46 +58,3 @@ func NewMessageReceiver(r *azservicebus.Receiver) *MessageReceiver {
type MessageReceiver struct {
*azservicebus.Receiver
}
func (m *MessageReceiver) RenewMessageLocks(ctx context.Context, msgs []*azservicebus.ReceivedMessage, timeout time.Duration) error {
if m == nil {
return nil
}
var wg sync.WaitGroup
errChan := make(chan error, len(msgs))
for _, msg := range msgs {
wg.Add(1)
go func(rmsg *azservicebus.ReceivedMessage) {
defer wg.Done()
lockCtx, lockCancel := context.WithTimeout(ctx, timeout)
defer lockCancel()
// Renew the lock for the message.
err := m.RenewMessageLock(lockCtx, rmsg, nil)
if err != nil {
errChan <- fmt.Errorf("couldn't renew active message lock for message %s, %w", rmsg.MessageID, err)
}
}(msg)
}
wg.Wait()
close(errChan)
errs := []error{}
for err := range errChan {
if err == nil {
continue
}
errs = append(errs, err)
}
if len(errs) > 0 {
return multierr.Combine(errs...)
}
return nil
}

View File

@ -21,6 +21,7 @@ import (
"time"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"go.uber.org/multierr"
"go.uber.org/ratelimit"
"github.com/dapr/kit/logger"
@ -35,6 +36,9 @@ const (
DefaultSesssionIdleTimeoutInSec = 60
DefaultMaxConcurrentSessions = 8
// Maximum number of concurrent operations such as lock renewals or message completion/abandonment
maxConcurrentOps = 20
)
// HandlerResponseItem represents a response from the handler for each message.
@ -299,7 +303,9 @@ func (s *Subscription) renewLocksBlocking(ctx context.Context, receiver Receiver
}
}
func (s *Subscription) doRenewLocks(ctx context.Context, msgReceiver *MessageReceiver) {
func (s *Subscription) doRenewLocks(ctx context.Context, receiver *MessageReceiver) {
s.logger.Debugf("Renewing message locks for %s", s.entity)
// Snapshot the messages to try to renew locks for.
s.mu.RLock()
msgs := make([]*azservicebus.ReceivedMessage, len(s.activeMessages))
@ -315,12 +321,49 @@ func (s *Subscription) doRenewLocks(ctx context.Context, msgReceiver *MessageRec
return
}
err := msgReceiver.RenewMessageLocks(ctx, msgs, s.timeout)
// Renew the locks for each message, with a limit of maxConcurrentOps in parallel
limitCh := make(chan struct{}, maxConcurrentOps)
errChan := make(chan error)
for _, msg := range msgs {
// Limit parllel executions
limitCh <- struct{}{}
go func(rMsg *azservicebus.ReceivedMessage) {
defer func() {
<-limitCh
}()
// Check again if the message is active, in case it was already completed in the meanwhile
s.mu.RLock()
_, ok := s.activeMessages[*rMsg.SequenceNumber]
s.mu.RUnlock()
if !ok {
return
}
// Renew the lock for the message.
lockCtx, lockCancel := context.WithTimeout(ctx, s.timeout)
defer lockCancel()
rErr := receiver.RenewMessageLock(lockCtx, rMsg, nil)
switch {
case IsLockLostError(rErr):
errChan <- errors.New("couldn't renew active message lock for message " + rMsg.MessageID + ": lock has been lost (this often happens if the message has already been completed or abandoned)")
case rErr != nil:
errChan <- fmt.Errorf("couldn't renew active message lock for message %s: %w", rMsg.MessageID, rErr)
default:
errChan <- nil
}
}(msg)
}
var err error
for i := 0; i < len(msgs); i++ {
multierr.AppendInto(&err, <-errChan)
}
if err != nil {
s.logger.Warnf("Error renewing message locks for %s: %v", s.entity, err)
} else {
s.logger.Debugf("Renewed message locks for %s", s.entity)
return
}
s.logger.Debugf("Renewed message locks for %s", s.entity)
}
func (s *Subscription) doRenewLocksSession(ctx context.Context, sessionReceiver *SessionReceiver) {
@ -347,8 +390,11 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec
s.logger.Debugf("Released message handle for %s on %s", msgs[0].MessageID, s.entity)
}
// Remove the messages from the map of active ones
s.removeActiveMessages(msgs)
// If we got a retriable error (app handler returned a retriable error, or a network error while connecting to the app, etc) consume a retriable error token
// We do it here, after the handler has been released but before removing the active message (which would allow us to retrieve more messages)
// We do it here, after the handler has been released but before releasing the active operation (which would allow us to retrieve more messages)
if consumeToken {
if s.logger.IsOutputLevelEnabled(logger.DebugLevel) {
s.logger.Debugf("Taking a retriable error token")
@ -360,11 +406,6 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec
}
}
for _, msg := range msgs {
// Remove the message from the map of active ones
s.removeActiveMessage(msg.MessageID, *msg.SequenceNumber)
}
// Remove an entry from activeOperationsChan to allow processing more messages
<-s.activeOperationsChan
}()
@ -403,28 +444,63 @@ func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.Rec
// Handle the errors on bulk messages and mark messages accordingly.
// Note, the order of the responses match the order of the messages.
for i, resp := range resps {
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
// This uses a background context in case ctx has been canceled already.
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
if resp.Error != nil {
// Log the error only, as we're running asynchronously.
s.logger.Errorf("App handler returned an error for message %s on %s: %s", msgs[i].MessageID, s.entity, resp.Error)
s.AbandonMessage(finalizeCtx, receiver, msgs[i])
} else {
s.CompleteMessage(finalizeCtx, receiver, msgs[i])
}
finalizeCancel()
// Perform the operations in a background goroutine with a limit of maxConcurrentOps concurrent
limitCh := make(chan struct{}, maxConcurrentOps)
wg := sync.WaitGroup{}
for i := range resps {
// Limit parllel executions
limitCh <- struct{}{}
wg.Add(1)
go func(i int) {
defer func() {
wg.Done()
<-limitCh
}()
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
// This uses a background context in case ctx has been canceled already.
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
if resps[i].Error != nil {
// Log the error only, as we're running asynchronously.
s.logger.Errorf("App handler returned an error for message %s on %s: %s", msgs[i].MessageID, s.entity, resps[i].Error)
s.AbandonMessage(finalizeCtx, receiver, msgs[i])
} else {
s.CompleteMessage(finalizeCtx, receiver, msgs[i])
}
finalizeCancel()
}(i)
}
return
}
// No error, so we can complete all messages.
for _, msg := range msgs {
if len(msgs) == 1 {
// Avoid spawning goroutines for 1 message
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
s.CompleteMessage(finalizeCtx, receiver, msg)
s.CompleteMessage(finalizeCtx, receiver, msgs[0])
finalizeCancel()
} else {
// Perform the operations in a background goroutine with a limit of maxConcurrentOps concurrent
limitCh := make(chan struct{}, maxConcurrentOps)
wg := sync.WaitGroup{}
for _, msg := range msgs {
// Limit parllel executions
limitCh <- struct{}{}
wg.Add(1)
go func(msg *azservicebus.ReceivedMessage) {
defer func() {
wg.Done()
<-limitCh
}()
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
s.CompleteMessage(finalizeCtx, receiver, msg)
finalizeCancel()
}(msg)
}
}
}
@ -471,9 +547,12 @@ func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) error {
return nil
}
func (s *Subscription) removeActiveMessage(messageID string, messageKey int64) {
s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", messageID, messageKey, s.entity)
func (s *Subscription) removeActiveMessages(msgs []*azservicebus.ReceivedMessage) {
s.mu.Lock()
delete(s.activeMessages, messageKey)
s.mu.Unlock()
defer s.mu.Unlock()
for _, msg := range msgs {
s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", msg.MessageID, *msg.SequenceNumber, s.entity)
delete(s.activeMessages, *msg.SequenceNumber)
}
}