Merge pull request #2244 from Jarema/explicitly-add-consumer

Add `streamName` and epxlicitly bind to consumer and stream
This commit is contained in:
Bernd Verst 2022-11-11 00:28:43 -08:00 committed by GitHub
commit acb51bbbd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 41 additions and 23 deletions

2
go.mod
View File

@ -83,7 +83,7 @@ require (
github.com/mrz1836/postmark v1.3.0
github.com/nacos-group/nacos-sdk-go/v2 v2.1.2
github.com/nats-io/nats-server/v2 v2.9.4
github.com/nats-io/nats.go v1.19.0
github.com/nats-io/nats.go v1.19.1
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/stan.go v0.10.3
github.com/open-policy-agent/opa v0.45.0

4
go.sum
View File

@ -1136,8 +1136,8 @@ github.com/nats-io/nats-streaming-server v0.25.2/go.mod h1:bRbgx+iCG6EZEXpqVMroR
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.19.0 h1:H6j8aBnTQFoVrTGB6Xjd903UMdE7jz6DS4YkmAqgZ9Q=
github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nats.go v1.19.1 h1:pDQZthDfxRMSJ0ereExAM9ODf3JyS42Exk7iCMdbpec=
github.com/nats-io/nats.go v1.19.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=

View File

@ -113,49 +113,51 @@ func (js *jetstreamPubSub) Publish(req *pubsub.PublishRequest) error {
}
func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
var opts []nats.SubOpt
var consumerConfig nats.ConsumerConfig
consumerConfig.DeliverSubject = nats.NewInbox()
if v := js.meta.durableName; v != "" {
opts = append(opts, nats.Durable(v))
consumerConfig.Durable = v
}
if v := js.meta.startTime; !v.IsZero() {
opts = append(opts, nats.StartTime(v))
consumerConfig.OptStartTime = &v
} else if v := js.meta.startSequence; v > 0 {
opts = append(opts, nats.StartSequence(v))
consumerConfig.OptStartSeq = v
} else if js.meta.deliverAll {
opts = append(opts, nats.DeliverAll())
consumerConfig.DeliverPolicy = nats.DeliverAllPolicy
} else {
opts = append(opts, nats.DeliverLast())
consumerConfig.DeliverPolicy = nats.DeliverLastPolicy
}
if js.meta.flowControl {
opts = append(opts, nats.EnableFlowControl())
consumerConfig.FlowControl = true
}
if js.meta.ackWait != 0 {
opts = append(opts, nats.AckWait(js.meta.ackWait))
consumerConfig.AckWait = js.meta.ackWait
}
if js.meta.maxDeliver != 0 {
opts = append(opts, nats.MaxDeliver(js.meta.maxDeliver))
consumerConfig.MaxDeliver = js.meta.maxDeliver
}
if len(js.meta.backOff) != 0 {
opts = append(opts, nats.BackOff(js.meta.backOff))
consumerConfig.BackOff = js.meta.backOff
}
if js.meta.maxAckPending != 0 {
opts = append(opts, nats.MaxAckPending(js.meta.maxAckPending))
consumerConfig.MaxAckPending = js.meta.maxAckPending
}
if js.meta.replicas != 0 {
opts = append(opts, nats.ConsumerReplicas(js.meta.replicas))
consumerConfig.Replicas = js.meta.replicas
}
if js.meta.memoryStorage {
opts = append(opts, nats.ConsumerMemoryStorage())
consumerConfig.MemoryStorage = true
}
if js.meta.rateLimit != 0 {
opts = append(opts, nats.RateLimit(js.meta.rateLimit))
consumerConfig.RateLimit = js.meta.rateLimit
}
if js.meta.hearbeat != 0 {
opts = append(opts, nats.IdleHeartbeat(js.meta.hearbeat))
consumerConfig.Heartbeat = js.meta.hearbeat
}
natsHandler := func(m *nats.Msg) {
@ -194,14 +196,27 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
}
var err error
streamName := js.meta.streamName
if streamName == "" {
streamName, err = js.jsc.StreamNameBySubject(req.Topic)
if err != nil {
return err
}
}
var subscription *nats.Subscription
consumerInfo, err := js.jsc.AddConsumer(streamName, &consumerConfig)
if err != nil {
return err
}
if queue := js.meta.queueGroupName; queue != "" {
js.l.Debugf("nats: subscribed to subject %s with queue group %s",
req.Topic, js.meta.queueGroupName)
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, opts...)
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name))
} else {
js.l.Debugf("nats: subscribed to subject %s", req.Topic)
subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, opts...)
subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, nats.Bind(streamName, consumerInfo.Name))
}
if err != nil {
return err

View File

@ -32,6 +32,7 @@ type metadata struct {
tlsClientKey string
name string
streamName string
durableName string
queueGroupName string
startSequence uint64
@ -141,5 +142,7 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.hearbeat = v
}
m.streamName = psm.Properties["streamName"]
return m, nil
}

View File

@ -11,7 +11,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect
github.com/nats-io/nats.go v1.19.0 // indirect
github.com/nats-io/nats.go v1.19.1 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect

View File

@ -14,8 +14,8 @@ github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmL
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/nats-server/v2 v2.9.4 h1:GvRgv1936J/zYUwMg/cqtYaJ6L+bgeIOIvPslbesdow=
github.com/nats-io/nats.go v1.19.0 h1:H6j8aBnTQFoVrTGB6Xjd903UMdE7jz6DS4YkmAqgZ9Q=
github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nats.go v1.19.1 h1:pDQZthDfxRMSJ0ereExAM9ODf3JyS42Exk7iCMdbpec=
github.com/nats-io/nats.go v1.19.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=