use sync.Map instead of map and mutex for active message tracking
Signed-off-by: Joni Collinge <jonathancollinge@live.com>
This commit is contained in:
parent
77504a8289
commit
58daacbdcd
|
@ -49,8 +49,7 @@ type HandlerFn func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) (
|
||||||
// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
|
// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
|
||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
entity string
|
entity string
|
||||||
mu sync.RWMutex
|
activeMessages sync.Map
|
||||||
activeMessages map[int64]*azservicebus.ReceivedMessage
|
|
||||||
activeOperationsChan chan struct{}
|
activeOperationsChan chan struct{}
|
||||||
requireSessions bool // read-only once set
|
requireSessions bool // read-only once set
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
@ -99,7 +98,6 @@ func NewSubscription(
|
||||||
|
|
||||||
s := &Subscription{
|
s := &Subscription{
|
||||||
entity: opts.Entity,
|
entity: opts.Entity,
|
||||||
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
|
|
||||||
timeout: time.Duration(opts.TimeoutInSec) * time.Second,
|
timeout: time.Duration(opts.TimeoutInSec) * time.Second,
|
||||||
maxBulkSubCount: *opts.MaxBulkSubCount,
|
maxBulkSubCount: *opts.MaxBulkSubCount,
|
||||||
requireSessions: opts.RequireSessions,
|
requireSessions: opts.RequireSessions,
|
||||||
|
@ -361,14 +359,11 @@ func (s *Subscription) RenewLocksBlocking(ctx context.Context, receiver Receiver
|
||||||
s.logger.Debugf("Renewed session %s locks for %s", sessionReceiver.SessionID(), s.entity)
|
s.logger.Debugf("Renewed session %s locks for %s", sessionReceiver.SessionID(), s.entity)
|
||||||
} else {
|
} else {
|
||||||
// Snapshot the messages to try to renew locks for.
|
// Snapshot the messages to try to renew locks for.
|
||||||
s.mu.RLock()
|
msgs := make([]*azservicebus.ReceivedMessage, 0)
|
||||||
msgs := make([]*azservicebus.ReceivedMessage, 0, len(s.activeMessages))
|
s.activeMessages.Range(func(key, value interface{}) bool {
|
||||||
var i int
|
msgs = append(msgs, value.(*azservicebus.ReceivedMessage))
|
||||||
for _, m := range s.activeMessages {
|
return true
|
||||||
msgs[i] = m
|
})
|
||||||
i++
|
|
||||||
}
|
|
||||||
s.mu.RUnlock()
|
|
||||||
|
|
||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
s.logger.Debugf("No active messages require lock renewal for %s", s.entity)
|
s.logger.Debugf("No active messages require lock renewal for %s", s.entity)
|
||||||
|
@ -485,15 +480,11 @@ func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) error {
|
||||||
logSuffix = fmt.Sprintf(" with session id %s", *m.SessionID)
|
logSuffix = fmt.Sprintf(" with session id %s", *m.SessionID)
|
||||||
}
|
}
|
||||||
s.logger.Debugf("Adding message %s with sequence number %d to active messages on %s%s", m.MessageID, *m.SequenceNumber, s.entity, logSuffix)
|
s.logger.Debugf("Adding message %s with sequence number %d to active messages on %s%s", m.MessageID, *m.SequenceNumber, s.entity, logSuffix)
|
||||||
s.mu.Lock()
|
s.activeMessages.Store(*m.SequenceNumber, m)
|
||||||
s.activeMessages[*m.SequenceNumber] = m
|
|
||||||
s.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscription) removeActiveMessage(messageID string, messageKey int64) {
|
func (s *Subscription) removeActiveMessage(messageID string, messageKey int64) {
|
||||||
s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", messageID, messageKey, s.entity)
|
s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", messageID, messageKey, s.entity)
|
||||||
s.mu.Lock()
|
s.activeMessages.Delete(messageKey)
|
||||||
delete(s.activeMessages, messageKey)
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue