Signed-off-by: Joni Collinge <jonathancollinge@live.com>
This commit is contained in:
Joni Collinge 2023-01-05 14:15:00 +00:00
parent ce29a535d0
commit e097a04842
No known key found for this signature in database
GPG Key ID: BF9B59005264DD95
1 changed files with 12 additions and 15 deletions

View File

@ -134,34 +134,33 @@ func (s *Subscription) Connect(newReceiverFunc func() (Receiver, error)) (Receiv
config.MaxElapsedTime = 0
backoff := config.NewBackOffWithContext(s.ctx)
var receiver Receiver
err := retry.NotifyRecover(
func() error {
return retry.NotifyRecoverWithData(
func() (Receiver, error) {
var receiver Receiver
clientAttempt, innerErr := newReceiverFunc()
if innerErr != nil {
if s.requireSessions {
var sbErr *azservicebus.Error
if errors.As(innerErr, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
return errors.New("no sessions available")
return nil, errors.New("no sessions available")
}
}
return innerErr
return nil, innerErr
}
if s.requireSessions {
sessionReceiver, ok := clientAttempt.(*SessionReceiver)
if !ok {
return fmt.Errorf("expected a session receiver, got %T", clientAttempt)
return nil, fmt.Errorf("expected a session receiver, got %T", clientAttempt)
}
receiver = sessionReceiver
} else {
msgReciever, ok := clientAttempt.(*MessageReceiver)
if !ok {
return fmt.Errorf("expected a message receiver, got %T", clientAttempt)
return nil, fmt.Errorf("expected a message receiver, got %T", clientAttempt)
}
receiver = msgReciever
}
return nil
return receiver, nil
},
backoff,
func(err error, d time.Duration) {
@ -171,8 +170,6 @@ func (s *Subscription) Connect(newReceiverFunc func() (Receiver, error)) (Receiv
s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity)
},
)
return receiver, err
}
type ReceiveOptions struct {
@ -348,18 +345,18 @@ func (s *Subscription) RenewLocksBlocking(ctx context.Context, receiver Receiver
for {
select {
case <-ctx.Done():
s.logger.Infof("context canceled while renewing locks for %s", s.entity)
s.logger.Infof("Context canceled while renewing locks for %s", s.entity)
return nil
case <-t.C:
// Check if the context is still valid
if ctx.Err() != nil {
s.logger.Infof("context canceled while renewing locks for %s", s.entity)
s.logger.Infof("Context canceled while renewing locks for %s", s.entity)
return nil
}
if s.requireSessions {
sessionReceiver := receiver.(*SessionReceiver)
if err := sessionReceiver.RenewSessionLocks(ctx, opts.TimeoutInSec); err != nil {
s.logger.Errorf("error renewing session locks for %s: %s", s.entity, err)
s.logger.Warnf("Error renewing session locks for %s: %s", s.entity, err)
}
s.logger.Debugf("Renewed session %s locks for %s", sessionReceiver.SessionID(), s.entity)
} else {
@ -377,7 +374,7 @@ func (s *Subscription) RenewLocksBlocking(ctx context.Context, receiver Receiver
}
msgReceiver := receiver.(*MessageReceiver)
if err := msgReceiver.RenewMessageLocks(ctx, msgs, opts.TimeoutInSec); err != nil {
s.logger.Errorf("error renewing message locks for %s: %s", s.entity, err)
s.logger.Warnf("Error renewing message locks for %s: %s", s.entity, err)
}
s.logger.Debugf("Renewed message locks for %s", s.entity)
}