Merge branch 'master' into merge19intomaster

This commit is contained in:
Dapr Bot 2022-10-14 13:56:59 -07:00 committed by GitHub
commit 961aa9da72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 0 deletions

View File

@ -115,6 +115,31 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
opts = append(opts, nats.EnableFlowControl())
}
if js.meta.ackWait != 0 {
opts = append(opts, nats.AckWait(js.meta.ackWait))
}
if js.meta.maxDeliver != 0 {
opts = append(opts, nats.MaxDeliver(js.meta.maxDeliver))
}
if len(js.meta.backOff) != 0 {
opts = append(opts, nats.BackOff(js.meta.backOff))
}
if js.meta.maxAckPending != 0 {
opts = append(opts, nats.MaxAckPending(js.meta.maxAckPending))
}
if js.meta.replicas != 0 {
opts = append(opts, nats.ConsumerReplicas(js.meta.replicas))
}
if js.meta.memoryStorage {
opts = append(opts, nats.ConsumerMemoryStorage())
}
if js.meta.rateLimit != 0 {
opts = append(opts, nats.RateLimit(js.meta.rateLimit))
}
if js.meta.hearbeat != 0 {
opts = append(opts, nats.IdleHeartbeat(js.meta.hearbeat))
}
natsHandler := func(m *nats.Msg) {
jsm, err := m.Metadata()
if err != nil {

View File

@ -16,6 +16,7 @@ package jetstream
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/dapr/components-contrib/pubsub"
@ -37,6 +38,14 @@ type metadata struct {
startTime time.Time
deliverAll bool
flowControl bool
ackWait time.Duration
maxDeliver int
backOff []time.Duration
maxAckPending int
replicas int
memoryStorage bool
rateLimit uint64
hearbeat time.Duration
}
func parseMetadata(psm pubsub.Metadata) (metadata, error) {
@ -92,6 +101,41 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil {
m.flowControl = v
}
if v, err := time.ParseDuration(psm.Properties["ackWait"]); err == nil {
m.ackWait = v
}
if v, err := strconv.Atoi(psm.Properties["maxDeliver"]); err == nil {
m.maxDeliver = v
}
backOffSlice := strings.Split(psm.Properties["backOff"], ",")
var backOff []time.Duration
for _, item := range backOffSlice {
trimmed := strings.TrimSpace(item)
if duration, err := time.ParseDuration(trimmed); err == nil {
backOff = append(backOff, duration)
}
}
m.backOff = backOff
if v, err := strconv.Atoi(psm.Properties["maxAckPending"]); err == nil {
m.maxAckPending = v
}
if v, err := strconv.Atoi(psm.Properties["replicas"]); err == nil {
m.replicas = v
}
if v, err := strconv.ParseBool(psm.Properties["memoryStorage"]); err == nil {
m.memoryStorage = v
}
if v, err := strconv.ParseUint(psm.Properties["rateLimit"], 10, 64); err == nil {
m.rateLimit = v
}
if v, err := time.ParseDuration(psm.Properties["hearbeat"]); err == nil {
m.hearbeat = v
}
return m, nil
}

View File

@ -41,6 +41,14 @@ func TestParseMetadata(t *testing.T) {
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"ackWait": "2s",
"maxDeliver": "10",
"backOff": "500ms, 2s, 10s",
"maxAckPending": "5000",
"replicas": "3",
"memoryStorage": "true",
"rateLimit": "20000",
"hearbeat": "1s",
},
}},
want: metadata{
@ -52,6 +60,14 @@ func TestParseMetadata(t *testing.T) {
startTime: time.Unix(1629328511, 0),
deliverAll: true,
flowControl: true,
ackWait: 2 * time.Second,
maxDeliver: 10,
backOff: []time.Duration{time.Millisecond * 500, time.Second * 2, time.Second * 10},
maxAckPending: 5000,
replicas: 3,
memoryStorage: true,
rateLimit: 20000,
hearbeat: time.Second * 1,
},
expectErr: false,
},