Cleanup the opts

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
Tomasz Pietrek 2022-10-31 21:07:36 +01:00
parent 90bf32280d
commit 54cb93c786
1 changed files with 2 additions and 18 deletions

View File

@ -96,65 +96,50 @@ func (js *jetstreamPubSub) Publish(req *pubsub.PublishRequest) error {
func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
var opts []nats.SubOpt
var consumerConfig nats.ConsumerConfig
consumerConfig.DeliverSubject = nats.NewInbox()
if v := js.meta.durableName; v != "" {
opts = append(opts, nats.Durable(v))
consumerConfig.Durable = v
}
if v := js.meta.startTime; !v.IsZero() {
opts = append(opts, nats.StartTime(v))
consumerConfig.OptStartTime = &v
} else if v := js.meta.startSequence; v > 0 {
opts = append(opts, nats.StartSequence(v))
consumerConfig.OptStartSeq = v
} else if js.meta.deliverAll {
opts = append(opts, nats.DeliverAll())
consumerConfig.DeliverPolicy = nats.DeliverAllPolicy
} else {
opts = append(opts, nats.DeliverLast())
consumerConfig.DeliverPolicy = nats.DeliverLastPolicy
}
if js.meta.flowControl {
opts = append(opts, nats.EnableFlowControl())
consumerConfig.FlowControl = true
}
if js.meta.ackWait != 0 {
opts = append(opts, nats.AckWait(js.meta.ackWait))
consumerConfig.AckWait = js.meta.ackWait
}
if js.meta.maxDeliver != 0 {
opts = append(opts, nats.MaxDeliver(js.meta.maxDeliver))
consumerConfig.MaxDeliver = js.meta.maxDeliver
}
if len(js.meta.backOff) != 0 {
opts = append(opts, nats.BackOff(js.meta.backOff))
consumerConfig.BackOff = js.meta.backOff
}
if js.meta.maxAckPending != 0 {
opts = append(opts, nats.MaxAckPending(js.meta.maxAckPending))
consumerConfig.MaxAckPending = js.meta.maxAckPending
}
if js.meta.replicas != 0 {
opts = append(opts, nats.ConsumerReplicas(js.meta.replicas))
consumerConfig.Replicas = js.meta.replicas
}
if js.meta.memoryStorage {
opts = append(opts, nats.ConsumerMemoryStorage())
consumerConfig.MemoryStorage = true
}
if js.meta.rateLimit != 0 {
opts = append(opts, nats.RateLimit(js.meta.rateLimit))
consumerConfig.RateLimit = js.meta.rateLimit
}
if js.meta.hearbeat != 0 {
opts = append(opts, nats.IdleHeartbeat(js.meta.hearbeat))
consumerConfig.Heartbeat = js.meta.hearbeat
}
@ -207,15 +192,14 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
if err != nil {
return err
}
opts = append(opts, nats.Bind(streamName, consumerInfo.Name))
if queue := js.meta.queueGroupName; queue != "" {
js.l.Debugf("nats: subscribed to subject %s with queue group %s",
req.Topic, js.meta.queueGroupName)
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, opts...)
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name))
} else {
js.l.Debugf("nats: subscribed to subject %s", req.Topic)
subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, opts...)
subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, nats.Bind(streamName, consumerInfo.Name))
}
if err != nil {
return err