wg to flush bulks

Signed-off-by: Amit Mor <amit.mor@hotmail.com>
This commit is contained in:
Amit Mor 2022-04-11 11:12:17 +03:00
parent 212d0c1a29
commit c31866099c
No known key found for this signature in database
GPG Key ID: 6ED17DBA7E1AD71B
1 changed files with 6 additions and 0 deletions

View File

@ -621,6 +621,7 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn
}
s.logger.Debugf("%v message(s) received", len(messageResponse.Messages))
var wg sync.WaitGroup
for _, message := range messageResponse.Messages {
if err := s.validateMessage(message, queueInfo, deadLettersQueueInfo, handler); err != nil {
s.logger.Errorf("message is not valid for further processing by the handler. error is: %w", err)
@ -631,7 +632,11 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn
if err := s.callHandler(message, queueInfo, handler); err != nil {
s.logger.Errorf("error while handling received message. error is: %w", err)
}
wg.Done()
}
wg.Add(1)
switch s.metadata.concurrencyMode {
case pubsub.Single:
f()
@ -639,6 +644,7 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn
go f()
}
}
wg.Wait()
}
}()
}