From 84742aa1a87b2f89d71e3b49e88c819a5dec0d9a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 12 Oct 2022 11:23:03 +0200 Subject: [PATCH] Add more JetStream options Signed-off-by: Tomasz Pietrek --- pubsub/jetstream/jetstream.go | 25 +++++++++++++++++ pubsub/jetstream/metadata.go | 45 +++++++++++++++++++++++++++++++ pubsub/jetstream/metadata_test.go | 16 +++++++++++ 3 files changed, 86 insertions(+) diff --git a/pubsub/jetstream/jetstream.go b/pubsub/jetstream/jetstream.go index fe85a6d7e..970329ac8 100644 --- a/pubsub/jetstream/jetstream.go +++ b/pubsub/jetstream/jetstream.go @@ -115,6 +115,31 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe opts = append(opts, nats.EnableFlowControl()) } + if js.meta.ackWait != 0 { + opts = append(opts, nats.AckWait(js.meta.ackWait)) + } + if js.meta.maxDeliver != 0 { + opts = append(opts, nats.MaxDeliver(js.meta.maxDeliver)) + } + if len(js.meta.backOff) != 0 { + opts = append(opts, nats.BackOff(js.meta.backOff)) + } + if js.meta.maxAckPending != 0 { + opts = append(opts, nats.MaxAckPending(js.meta.maxAckPending)) + } + if js.meta.replicas != 0 { + opts = append(opts, nats.ConsumerReplicas(js.meta.replicas)) + } + if js.meta.memoryStorage { + opts = append(opts, nats.ConsumerMemoryStorage()) + } + if js.meta.rateLimit != 0 { + opts = append(opts, nats.RateLimit(js.meta.rateLimit)) + } + if js.meta.hearbeat != 0 { + opts = append(opts, nats.IdleHeartbeat(js.meta.hearbeat)) + } + natsHandler := func(m *nats.Msg) { jsm, err := m.Metadata() if err != nil { diff --git a/pubsub/jetstream/metadata.go b/pubsub/jetstream/metadata.go index c4ec7784e..b4d48114b 100644 --- a/pubsub/jetstream/metadata.go +++ b/pubsub/jetstream/metadata.go @@ -16,6 +16,7 @@ package jetstream import ( "fmt" "strconv" + "strings" "time" "github.com/dapr/components-contrib/pubsub" @@ -37,6 +38,14 @@ type metadata struct { startTime time.Time deliverAll bool flowControl bool + ackWait time.Duration + maxDeliver int + backOff []time.Duration + maxAckPending int + replicas int + memoryStorage bool + rateLimit uint64 + hearbeat time.Duration } func parseMetadata(psm pubsub.Metadata) (metadata, error) { @@ -92,6 +101,42 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) { if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil { m.flowControl = v } + if v, err := time.ParseDuration(psm.Properties["ackWait"]); err == nil { + m.ackWait = v + } + + if v, err := strconv.Atoi(psm.Properties["maxDeliver"]); err == nil { + m.maxDeliver = v + } + + backOffSlice := strings.Split(psm.Properties["backOff"], ",") + var backOff []time.Duration + + for _, item := range backOffSlice { + trimmed := strings.TrimSpace(item) + if duration, err := time.ParseDuration(trimmed); err == nil { + backOff = append(backOff, duration) + } + + } + m.backOff = backOff + + if v, err := strconv.Atoi(psm.Properties["maxAckPending"]); err == nil { + m.maxAckPending = v + } + if v, err := strconv.Atoi(psm.Properties["replicas"]); err == nil { + m.replicas = v + } + if v, err := strconv.ParseBool(psm.Properties["memoryStorage"]); err == nil { + m.memoryStorage = v + } + if v, err := strconv.ParseUint(psm.Properties["rateLimit"], 10, 64); err == nil { + m.rateLimit = v + } + + if v, err := time.ParseDuration(psm.Properties["hearbeat"]); err == nil { + m.hearbeat = v + } return m, nil } diff --git a/pubsub/jetstream/metadata_test.go b/pubsub/jetstream/metadata_test.go index 5e3681f32..cf99607ea 100644 --- a/pubsub/jetstream/metadata_test.go +++ b/pubsub/jetstream/metadata_test.go @@ -41,6 +41,14 @@ func TestParseMetadata(t *testing.T) { "startTime": "1629328511", "deliverAll": "true", "flowControl": "true", + "ackWait": "2s", + "maxDeliver": "10", + "backOff": "500ms, 2s, 10s", + "maxAckPending": "5000", + "replicas": "3", + "memoryStorage": "true", + "rateLimit": "20000", + "hearbeat": "1s", }, }}, want: metadata{ @@ -52,6 +60,14 @@ func TestParseMetadata(t *testing.T) { startTime: time.Unix(1629328511, 0), deliverAll: true, flowControl: true, + ackWait: time.Duration(2 * time.Second), + maxDeliver: 10, + backOff: []time.Duration{time.Duration(time.Millisecond * 500), time.Duration(time.Second * 2), time.Duration(time.Second * 10)}, + maxAckPending: 5000, + replicas: 3, + memoryStorage: true, + rateLimit: 20000, + hearbeat: time.Duration(time.Second * 1), }, expectErr: false, },