Add domain and apiPrefix for JetStream

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
Tomasz Pietrek 2023-01-02 22:05:11 +01:00
parent 5922b602d4
commit 94894c465b
3 changed files with 14 additions and 1 deletions

View File

@ -68,7 +68,7 @@ func (js *jetstreamPubSub) Init(metadata pubsub.Metadata) error {
}
js.l.Debugf("Connected to nats at %s", js.meta.natsURL)
js.jsc, err = js.nc.JetStream()
js.jsc, err = js.nc.JetStream(nats.Domain(js.meta.domain), nats.APIPrefix(js.meta.apiPrefix))
if err != nil {
return err
}

View File

@ -51,6 +51,8 @@ type metadata struct {
hearbeat time.Duration
deliverPolicy nats.DeliverPolicy
ackPolicy nats.AckPolicy
domain string
apiPrefix string
}
func parseMetadata(psm pubsub.Metadata) (metadata, error) {
@ -143,6 +145,13 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.hearbeat = v
}
if domain := psm.Properties["domain"]; domain != "" {
m.domain = domain
}
if apiPrefix := psm.Properties["apiPrefix"]; apiPrefix != "" {
m.apiPrefix = apiPrefix
}
deliverPolicy := psm.Properties["deliverPolicy"]
switch deliverPolicy {
case "all", "":

View File

@ -50,6 +50,7 @@ func TestParseMetadata(t *testing.T) {
"memoryStorage": "true",
"rateLimit": "20000",
"hearbeat": "1s",
"domain": "hub",
},
}},
want: metadata{
@ -70,6 +71,7 @@ func TestParseMetadata(t *testing.T) {
hearbeat: time.Second * 1,
deliverPolicy: nats.DeliverAllPolicy,
ackPolicy: nats.AckExplicitPolicy,
domain: "hub",
},
expectErr: false,
},
@ -95,6 +97,7 @@ func TestParseMetadata(t *testing.T) {
"deliverPolicy": "sequence",
"startSequence": "5",
"ackPolicy": "all",
"apiPrefix": "HUB",
},
}},
want: metadata{
@ -116,6 +119,7 @@ func TestParseMetadata(t *testing.T) {
token: "myToken",
deliverPolicy: nats.DeliverByStartSequencePolicy,
ackPolicy: nats.AckAllPolicy,
apiPrefix: "HUB",
},
expectErr: false,
},