[pubsub/jetstream] Add support for concurrencyMode (#3222)

Signed-off-by: Byron Ruth <byron@nats.io>
This commit is contained in:
Byron Ruth 2023-11-13 14:33:48 -06:00 committed by GitHub
parent 3bcd0c7451
commit dd8d2ba185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 24 deletions

View File

@ -198,10 +198,8 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
natsHandler := func(m *nats.Msg) { natsHandler := func(m *nats.Msg) {
jsm, err := m.Metadata() jsm, err := m.Metadata()
if err != nil { if err != nil {
// If we get an error, then we don't have a valid JetStream // If we get an error, then we don't have a valid JetStream message.
// message.
js.l.Error(err) js.l.Error(err)
return return
} }
@ -260,7 +258,22 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name)) subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name))
} else { } else {
js.l.Debugf("nats: subscribed to subject %s", req.Topic) js.l.Debugf("nats: subscribed to subject %s", req.Topic)
subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, nats.Bind(streamName, consumerInfo.Name)) subscription, err = js.jsc.Subscribe(
req.Topic,
func(msg *nats.Msg) {
switch js.meta.Concurrency {
case pubsub.Single:
natsHandler(msg)
case pubsub.Parallel:
js.wg.Add(1)
go func() {
natsHandler(msg)
js.wg.Done()
}()
}
},
nats.Bind(streamName, consumerInfo.Name),
)
} }
if err != nil { if err != nil {
return err return err

View File

@ -55,10 +55,14 @@ type metadata struct {
internalAckPolicy nats.AckPolicy `mapstructure:"-"` internalAckPolicy nats.AckPolicy `mapstructure:"-"`
Domain string `mapstructure:"domain"` Domain string `mapstructure:"domain"`
APIPrefix string `mapstructure:"apiPrefix"` APIPrefix string `mapstructure:"apiPrefix"`
Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
} }
func parseMetadata(psm pubsub.Metadata) (metadata, error) { func parseMetadata(psm pubsub.Metadata) (metadata, error) {
var m metadata m := metadata{
Concurrency: pubsub.Single,
}
err := kitmd.DecodeMetadata(psm.Properties, &m) err := kitmd.DecodeMetadata(psm.Properties, &m)
if err != nil { if err != nil {
@ -119,5 +123,16 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.internalAckPolicy = nats.AckExplicitPolicy m.internalAckPolicy = nats.AckExplicitPolicy
} }
// Explicit check to prevent overriding the Single default
// (the previous behavior) if not set.
// TODO: See https://github.com/dapr/components-contrib/pull/3222#discussion_r1389772053
if psm.Properties[pubsub.ConcurrencyKey] != "" {
c, err := pubsub.Concurrency(psm.Properties)
if err != nil {
return metadata{}, err
}
m.Concurrency = c
}
return m, nil return m, nil
} }

View File

@ -74,6 +74,7 @@ func TestParseMetadata(t *testing.T) {
internalDeliverPolicy: nats.DeliverAllPolicy, internalDeliverPolicy: nats.DeliverAllPolicy,
internalAckPolicy: nats.AckExplicitPolicy, internalAckPolicy: nats.AckExplicitPolicy,
Domain: "hub", Domain: "hub",
Concurrency: pubsub.Single,
}, },
expectErr: false, expectErr: false,
}, },
@ -81,25 +82,26 @@ func TestParseMetadata(t *testing.T) {
desc: "Valid Metadata with token", desc: "Valid Metadata with token",
input: pubsub.Metadata{Base: mdata.Base{ input: pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{ Properties: map[string]string{
"natsURL": "nats://localhost:4222", "natsURL": "nats://localhost:4222",
"name": "myName", "name": "myName",
"durableName": "myDurable", "durableName": "myDurable",
"queueGroupName": "myQueue", "queueGroupName": "myQueue",
"startTime": "1629328511", "startTime": "1629328511",
"flowControl": "true", "flowControl": "true",
"ackWait": "2s", "ackWait": "2s",
"maxDeliver": "10", "maxDeliver": "10",
"backOff": "500ms, 2s, 10s", "backOff": "500ms, 2s, 10s",
"maxAckPending": "5000", "maxAckPending": "5000",
"replicas": "3", "replicas": "3",
"memoryStorage": "true", "memoryStorage": "true",
"rateLimit": "20000", "rateLimit": "20000",
"heartbeat": "1s", "heartbeat": "1s",
"token": "myToken", "token": "myToken",
"deliverPolicy": "sequence", "deliverPolicy": "sequence",
"startSequence": "5", "startSequence": "5",
"ackPolicy": "all", "ackPolicy": "all",
"apiPrefix": "HUB", "apiPrefix": "HUB",
"concurrencyMode": "parallel",
}, },
}}, }},
want: metadata{ want: metadata{
@ -125,6 +127,7 @@ func TestParseMetadata(t *testing.T) {
internalDeliverPolicy: nats.DeliverByStartSequencePolicy, internalDeliverPolicy: nats.DeliverByStartSequencePolicy,
internalAckPolicy: nats.AckAllPolicy, internalAckPolicy: nats.AckAllPolicy,
APIPrefix: "HUB", APIPrefix: "HUB",
Concurrency: pubsub.Parallel,
}, },
expectErr: false, expectErr: false,
}, },