diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 947a9b284..e3dec6942 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -621,6 +621,7 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn } s.logger.Debugf("%v message(s) received", len(messageResponse.Messages)) + var wg sync.WaitGroup for _, message := range messageResponse.Messages { if err := s.validateMessage(message, queueInfo, deadLettersQueueInfo, handler); err != nil { s.logger.Errorf("message is not valid for further processing by the handler. error is: %w", err) @@ -631,7 +632,11 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn if err := s.callHandler(message, queueInfo, handler); err != nil { s.logger.Errorf("error while handling received message. error is: %w", err) } + + wg.Done() } + + wg.Add(1) switch s.metadata.concurrencyMode { case pubsub.Single: f() @@ -639,6 +644,7 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn go f() } } + wg.Wait() } }() }