From ca36ecc556e5d2efebf963b88b781746c3958dc5 Mon Sep 17 00:00:00 2001 From: yaron2 Date: Thu, 26 May 2022 11:44:56 -0700 Subject: [PATCH] add parllel processing, fix retry bug Signed-off-by: yaron2 --- pubsub/natsstreaming/metadata.go | 7 ++++- pubsub/natsstreaming/natsstreaming.go | 31 ++++++++++++---------- pubsub/natsstreaming/natsstreaming_test.go | 20 ++++++++++++++ 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pubsub/natsstreaming/metadata.go b/pubsub/natsstreaming/metadata.go index 3f398114e..827cd43c5 100644 --- a/pubsub/natsstreaming/metadata.go +++ b/pubsub/natsstreaming/metadata.go @@ -13,7 +13,11 @@ limitations under the License. package natsstreaming -import "time" +import ( + "time" + + "github.com/dapr/components-contrib/pubsub" +) type metadata struct { natsURL string @@ -30,4 +34,5 @@ type metadata struct { startAtTimeFormat string ackWaitTime time.Duration maxInFlight uint64 + concurrencyMode pubsub.ConcurrencyMode } diff --git a/pubsub/natsstreaming/natsstreaming.go b/pubsub/natsstreaming/natsstreaming.go index 2c6a47018..a98cf47e8 100644 --- a/pubsub/natsstreaming/natsstreaming.go +++ b/pubsub/natsstreaming/natsstreaming.go @@ -180,6 +180,12 @@ func parseNATSStreamingMetadata(meta pubsub.Metadata) (metadata, error) { } } + c, err := pubsub.Concurrency(meta.Properties) + if err != nil { + return m, fmt.Errorf("nats-streaming error: can't parse %s: %s", pubsub.ConcurrencyKey, err) + } + + m.concurrencyMode = c return m, nil } @@ -237,24 +243,21 @@ func (n *natsStreamingPubSub) Subscribe(req pubsub.SubscribeRequest, handler pub Topic: req.Topic, Data: natsMsg.Data, } - b := n.backOffConfig.NewBackOffWithContext(n.ctx) - rerr := retry.NotifyRecover(func() error { - n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence) - herr := handler(n.ctx, &msg) - if herr == nil { - // we only send a successful ACK if there is no error from Dapr runtime + n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence) + + f := func() { + err := handler(n.ctx, &msg) + if err == nil { natsMsg.Ack() } + } - return herr - }, b, func(err error, d time.Duration) { - n.logger.Errorf("Error processing NATS Streaming message: %s/%d. Retrying...", natsMsg.Subject, natsMsg.Sequence) - }, func() { - n.logger.Infof("Successfully processed NATS Streaming message after it previously failed: %s/%d", natsMsg.Subject, natsMsg.Sequence) - }) - if rerr != nil && !errors.Is(rerr, context.Canceled) { - n.logger.Errorf("Error processing message and retries are exhausted: %s/%d.", natsMsg.Subject, natsMsg.Sequence) + switch n.metadata.concurrencyMode { + case pubsub.Single: + f() + case pubsub.Parallel: + go f() } } diff --git a/pubsub/natsstreaming/natsstreaming_test.go b/pubsub/natsstreaming/natsstreaming_test.go index 78e972974..0f1794e87 100644 --- a/pubsub/natsstreaming/natsstreaming_test.go +++ b/pubsub/natsstreaming/natsstreaming_test.go @@ -141,6 +141,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { natsStreamingClusterID: "testcluster", consumerID: "consumer1", subscriptionType: "topic", + pubsub.ConcurrencyKey: "single", startWithLastReceived: "true", }, "startWithLastReceived", "true", @@ -153,6 +154,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { natsStreamingClusterID: "testcluster", consumerID: "consumer1", subscriptionType: "topic", + pubsub.ConcurrencyKey: "single", deliverAll: "true", }, "deliverAll", "true", @@ -165,6 +167,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { natsStreamingClusterID: "testcluster", consumerID: "consumer1", subscriptionType: "topic", + pubsub.ConcurrencyKey: "single", deliverNew: "true", }, "deliverNew", "true", @@ -177,6 +180,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { natsStreamingClusterID: "testcluster", consumerID: "consumer1", subscriptionType: "topic", + pubsub.ConcurrencyKey: "single", startAtSequence: "42", }, "startAtSequence", "42", @@ -189,10 +193,24 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { natsStreamingClusterID: "testcluster", consumerID: "consumer1", subscriptionType: "topic", + pubsub.ConcurrencyKey: "single", startAtTimeDelta: "1h", }, "startAtTimeDelta", "1h", }, + + { + "using concurrencyMode", + map[string]string{ + natsURL: "nats://foo.bar:4222", + natsStreamingClusterID: "testcluster", + consumerID: "consumer1", + subscriptionType: "topic", + startAtTimeDelta: "1h", + pubsub.ConcurrencyKey: "single", + }, + "concurrencyMode", "single", + }, } for _, _test := range tests { @@ -208,12 +226,14 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { assert.NotEmpty(t, m.natsStreamingClusterID) assert.NotEmpty(t, m.subscriptionType) assert.NotEmpty(t, m.natsQueueGroupName) + assert.NotEmpty(t, m.concurrencyMode) assert.NotEmpty(t, _test.expectedMetadataValue) assert.Equal(t, _test.properties[natsURL], m.natsURL) assert.Equal(t, _test.properties[natsStreamingClusterID], m.natsStreamingClusterID) assert.Equal(t, _test.properties[subscriptionType], m.subscriptionType) assert.Equal(t, _test.properties[consumerID], m.natsQueueGroupName) + assert.Equal(t, _test.properties[pubsub.ConcurrencyKey], string(m.concurrencyMode)) assert.Equal(t, _test.properties[_test.expectedMetadataName], _test.expectedMetadataValue) }) }