diff --git a/pubsub/jetstream/jetstream.go b/pubsub/jetstream/jetstream.go index ca397f85a..23f034ca5 100644 --- a/pubsub/jetstream/jetstream.go +++ b/pubsub/jetstream/jetstream.go @@ -198,10 +198,8 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe natsHandler := func(m *nats.Msg) { jsm, err := m.Metadata() if err != nil { - // If we get an error, then we don't have a valid JetStream - // message. + // If we get an error, then we don't have a valid JetStream message. js.l.Error(err) - return } @@ -260,7 +258,22 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name)) } else { js.l.Debugf("nats: subscribed to subject %s", req.Topic) - subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, nats.Bind(streamName, consumerInfo.Name)) + subscription, err = js.jsc.Subscribe( + req.Topic, + func(msg *nats.Msg) { + switch js.meta.Concurrency { + case pubsub.Single: + natsHandler(msg) + case pubsub.Parallel: + js.wg.Add(1) + go func() { + natsHandler(msg) + js.wg.Done() + }() + } + }, + nats.Bind(streamName, consumerInfo.Name), + ) } if err != nil { return err diff --git a/pubsub/jetstream/metadata.go b/pubsub/jetstream/metadata.go index 1a82cfe54..4c91d7f8e 100644 --- a/pubsub/jetstream/metadata.go +++ b/pubsub/jetstream/metadata.go @@ -55,10 +55,14 @@ type metadata struct { internalAckPolicy nats.AckPolicy `mapstructure:"-"` Domain string `mapstructure:"domain"` APIPrefix string `mapstructure:"apiPrefix"` + + Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"` } func parseMetadata(psm pubsub.Metadata) (metadata, error) { - var m metadata + m := metadata{ + Concurrency: pubsub.Single, + } err := kitmd.DecodeMetadata(psm.Properties, &m) if err != nil { @@ -119,5 +123,16 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) { m.internalAckPolicy = nats.AckExplicitPolicy } + // Explicit check to prevent overriding the Single default + // (the previous behavior) if not set. + // TODO: See https://github.com/dapr/components-contrib/pull/3222#discussion_r1389772053 + if psm.Properties[pubsub.ConcurrencyKey] != "" { + c, err := pubsub.Concurrency(psm.Properties) + if err != nil { + return metadata{}, err + } + m.Concurrency = c + } + return m, nil } diff --git a/pubsub/jetstream/metadata_test.go b/pubsub/jetstream/metadata_test.go index 4167b63de..2d08e5173 100644 --- a/pubsub/jetstream/metadata_test.go +++ b/pubsub/jetstream/metadata_test.go @@ -74,6 +74,7 @@ func TestParseMetadata(t *testing.T) { internalDeliverPolicy: nats.DeliverAllPolicy, internalAckPolicy: nats.AckExplicitPolicy, Domain: "hub", + Concurrency: pubsub.Single, }, expectErr: false, }, @@ -81,25 +82,26 @@ func TestParseMetadata(t *testing.T) { desc: "Valid Metadata with token", input: pubsub.Metadata{Base: mdata.Base{ Properties: map[string]string{ - "natsURL": "nats://localhost:4222", - "name": "myName", - "durableName": "myDurable", - "queueGroupName": "myQueue", - "startTime": "1629328511", - "flowControl": "true", - "ackWait": "2s", - "maxDeliver": "10", - "backOff": "500ms, 2s, 10s", - "maxAckPending": "5000", - "replicas": "3", - "memoryStorage": "true", - "rateLimit": "20000", - "heartbeat": "1s", - "token": "myToken", - "deliverPolicy": "sequence", - "startSequence": "5", - "ackPolicy": "all", - "apiPrefix": "HUB", + "natsURL": "nats://localhost:4222", + "name": "myName", + "durableName": "myDurable", + "queueGroupName": "myQueue", + "startTime": "1629328511", + "flowControl": "true", + "ackWait": "2s", + "maxDeliver": "10", + "backOff": "500ms, 2s, 10s", + "maxAckPending": "5000", + "replicas": "3", + "memoryStorage": "true", + "rateLimit": "20000", + "heartbeat": "1s", + "token": "myToken", + "deliverPolicy": "sequence", + "startSequence": "5", + "ackPolicy": "all", + "apiPrefix": "HUB", + "concurrencyMode": "parallel", }, }}, want: metadata{ @@ -125,6 +127,7 @@ func TestParseMetadata(t *testing.T) { internalDeliverPolicy: nats.DeliverByStartSequencePolicy, internalAckPolicy: nats.AckAllPolicy, APIPrefix: "HUB", + Concurrency: pubsub.Parallel, }, expectErr: false, },