Explicitly use AddConsumer

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
Tomasz Pietrek 2022-10-31 20:11:17 +01:00
parent e04a44895f
commit 83d7d09ab8
1 changed files with 12 additions and 11 deletions

View File

@ -99,6 +99,8 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
var opts []nats.SubOpt
var consumerConfig nats.ConsumerConfig
consumerConfig.DeliverSubject = nats.NewInbox()
if v := js.meta.durableName; v != "" {
opts = append(opts, nats.Durable(v))
consumerConfig.Durable = v
@ -192,21 +194,20 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
}
var err error
var subscription *nats.Subscription
if js.meta.streamName != "" {
js.l.Debug("Stream name provided. Explicitly creating Consumer")
info, err := js.jsc.StreamInfo(js.meta.streamName)
streamName := js.meta.streamName
if streamName == "" {
streamName, err = js.jsc.StreamNameBySubject(req.Topic)
if err != nil {
return err
}
consumerInfo, err := js.jsc.AddConsumer(info.Config.Name, &consumerConfig)
if err != nil {
return err
}
opts = append(opts, nats.Bind(info.Config.Name, consumerInfo.Name))
}
var subscription *nats.Subscription
consumerInfo, err := js.jsc.AddConsumer(streamName, &consumerConfig)
if err != nil {
return err
}
opts = append(opts, nats.Bind(streamName, consumerInfo.Name))
if queue := js.meta.queueGroupName; queue != "" {
js.l.Debugf("nats: subscribed to subject %s with queue group %s",