concurrent call handler

Signed-off-by: Amit Mor <amit.mor@hotmail.com>
This commit is contained in:
Amit Mor 2022-04-07 14:12:56 +03:00
parent 13c97c9542
commit 7a83b28e7e
3 changed files with 21 additions and 4 deletions

View File

@ -50,7 +50,7 @@ type snsSqsMetadata struct {
// aws account ID. internally resolved if not given.
accountID string
// publish concurrency mode
concurrency pubsub.ConcurrencyMode
concurrencyMode pubsub.ConcurrencyMode
}
func getAliasedProperty(aliases []string, metadata pubsub.Metadata) (string, bool) {
@ -155,7 +155,7 @@ func (md *snsSqsMetadata) setConcurrencyMode(props map[string]string) error {
if err != nil {
return err
}
md.concurrency = c
md.concurrencyMode = c
return nil
}

View File

@ -629,10 +629,10 @@ func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueIn
f := func() {
if err := s.callHandler(message, queueInfo, handler); err != nil {
s.logger.Errorf("error handling received message with error: %w", err)
s.logger.Errorf("error while handling received message. error is: %w", err)
}
}
switch s.metadata.concurrency {
switch s.metadata.concurrencyMode {
case pubsub.Single:
f()
case pubsub.Parallel:

View File

@ -48,6 +48,7 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) {
md, err := ps.getSnsSqsMetatdata(pubsub.Metadata{Properties: map[string]string{
"consumerID": "consumer",
"Endpoint": "endpoint",
"concurrencyMode": string(pubsub.Single),
"accessKey": "a",
"secretKey": "s",
"sessionToken": "t",
@ -64,6 +65,7 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) {
r.Equal("consumer", md.sqsQueueName)
r.Equal("endpoint", md.Endpoint)
r.Equal(pubsub.Single, md.concurrencyMode)
r.Equal("a", md.AccessKey)
r.Equal("s", md.SecretKey)
r.Equal("t", md.SessionToken)
@ -100,6 +102,7 @@ func Test_getSnsSqsMetatdata_defaults(t *testing.T) {
r.Equal("s", md.SecretKey)
r.Equal("", md.SessionToken)
r.Equal("r", md.Region)
r.Equal(pubsub.Parallel, md.concurrencyMode)
r.Equal(int64(10), md.messageVisibilityTimeout)
r.Equal(int64(10), md.messageRetryLimit)
r.Equal(int64(1), md.messageWaitTimeSeconds)
@ -280,6 +283,20 @@ func Test_getSnsSqsMetatdata_invalidMetadataSetup(t *testing.T) {
}},
name: "invalid message disableEntityManagement",
},
// invalid concurrencyMode
{
metadata: pubsub.Metadata{Properties: map[string]string{
"consumerID": "consumer",
"Endpoint": "endpoint",
"AccessKey": "acctId",
"SecretKey": "secret",
"awsToken": "token",
"Region": "region",
"messageRetryLimit": "10",
"concurrencyMode": "invalid",
}},
name: "invalid message concurrencyMode",
},
}
l := logger.NewLogger("SnsSqs unit test")