diff --git a/pubsub/jetstream/jetstream.go b/pubsub/jetstream/jetstream.go index 2ead8a94c..bcc095acd 100644 --- a/pubsub/jetstream/jetstream.go +++ b/pubsub/jetstream/jetstream.go @@ -126,14 +126,11 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe if v := js.meta.startTime; !v.IsZero() { consumerConfig.OptStartTime = &v - } else if v := js.meta.startSequence; v > 0 { - consumerConfig.OptStartSeq = v - } else if js.meta.deliverAll { - consumerConfig.DeliverPolicy = nats.DeliverAllPolicy - } else { - consumerConfig.DeliverPolicy = nats.DeliverLastPolicy } - + if v := js.meta.startSequence; v > 0 { + consumerConfig.OptStartSeq = v + } + consumerConfig.DeliverPolicy = js.meta.deliverPolicy if js.meta.flowControl { consumerConfig.FlowControl = true } diff --git a/pubsub/jetstream/metadata.go b/pubsub/jetstream/metadata.go index 926a3c645..c7cf78288 100644 --- a/pubsub/jetstream/metadata.go +++ b/pubsub/jetstream/metadata.go @@ -40,7 +40,6 @@ type metadata struct { queueGroupName string startSequence uint64 startTime time.Time - deliverAll bool flowControl bool ackWait time.Duration maxDeliver int @@ -50,6 +49,7 @@ type metadata struct { memoryStorage bool rateLimit uint64 hearbeat time.Duration + deliverPolicy nats.DeliverPolicy ackPolicy nats.AckPolicy } @@ -104,10 +104,6 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) { m.startTime = time.Unix(v, 0) } - if v, err := strconv.ParseBool(psm.Properties["deliverAll"]); err == nil { - m.deliverAll = v - } - if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil { m.flowControl = v } @@ -147,6 +143,22 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) { m.hearbeat = v } + deliverPolicy := psm.Properties["deliverPolicy"] + switch deliverPolicy { + case "all", "": + m.deliverPolicy = nats.DeliverAllPolicy + case "last": + m.deliverPolicy = nats.DeliverLastPolicy + case "new": + m.deliverPolicy = nats.DeliverNewPolicy + case "sequence": + m.deliverPolicy = nats.DeliverByStartSequencePolicy + case "time": + m.deliverPolicy = nats.DeliverByStartTimePolicy + default: + return metadata{}, fmt.Errorf("deliver policy %s is not one of: all, last, new, sequence, time", deliverPolicy) + } + m.streamName = psm.Properties["streamName"] switch psm.Properties["ackPolicy"] { diff --git a/pubsub/jetstream/metadata_test.go b/pubsub/jetstream/metadata_test.go index 2775b18d9..65a2fab52 100644 --- a/pubsub/jetstream/metadata_test.go +++ b/pubsub/jetstream/metadata_test.go @@ -41,7 +41,6 @@ func TestParseMetadata(t *testing.T) { "queueGroupName": "myQueue", "startSequence": "1", "startTime": "1629328511", - "deliverAll": "true", "flowControl": "true", "ackWait": "2s", "maxDeliver": "10", @@ -60,7 +59,6 @@ func TestParseMetadata(t *testing.T) { queueGroupName: "myQueue", startSequence: 1, startTime: time.Unix(1629328511, 0), - deliverAll: true, flowControl: true, ackWait: 2 * time.Second, maxDeliver: 10, @@ -70,6 +68,7 @@ func TestParseMetadata(t *testing.T) { memoryStorage: true, rateLimit: 20000, hearbeat: time.Second * 1, + deliverPolicy: nats.DeliverAllPolicy, ackPolicy: nats.AckExplicitPolicy, }, expectErr: false, @@ -82,9 +81,7 @@ func TestParseMetadata(t *testing.T) { "name": "myName", "durableName": "myDurable", "queueGroupName": "myQueue", - "startSequence": "1", "startTime": "1629328511", - "deliverAll": "true", "flowControl": "true", "ackWait": "2s", "maxDeliver": "10", @@ -95,6 +92,8 @@ func TestParseMetadata(t *testing.T) { "rateLimit": "20000", "hearbeat": "1s", "token": "myToken", + "deliverPolicy": "sequence", + "startSequence": "5", "ackPolicy": "all", }, }}, @@ -103,9 +102,8 @@ func TestParseMetadata(t *testing.T) { name: "myName", durableName: "myDurable", queueGroupName: "myQueue", - startSequence: 1, + startSequence: 5, startTime: time.Unix(1629328511, 0), - deliverAll: true, flowControl: true, ackWait: 2 * time.Second, maxDeliver: 10, @@ -116,6 +114,7 @@ func TestParseMetadata(t *testing.T) { rateLimit: 20000, hearbeat: time.Second * 1, token: "myToken", + deliverPolicy: nats.DeliverByStartSequencePolicy, ackPolicy: nats.AckAllPolicy, }, expectErr: false, @@ -130,7 +129,6 @@ func TestParseMetadata(t *testing.T) { "queueGroupName": "myQueue", "startSequence": "1", "startTime": "1629328511", - "deliverAll": "true", "flowControl": "true", "jwt": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c", }, @@ -148,7 +146,6 @@ func TestParseMetadata(t *testing.T) { "queueGroupName": "myQueue", "startSequence": "1", "startTime": "1629328511", - "deliverAll": "true", "flowControl": "true", "seedKey": "SUACS34K232OKPRDOMKC6QEWXWUDJTT6R6RZM2WPMURUS5Z3POU7BNIL4Y", }, @@ -166,7 +163,6 @@ func TestParseMetadata(t *testing.T) { "queueGroupName": "myQueue", "startSequence": "1", "startTime": "1629328511", - "deliverAll": "true", "flowControl": "true", "tls_client_cert": "/path/to/tls.pem", }, @@ -184,7 +180,6 @@ func TestParseMetadata(t *testing.T) { "queueGroupName": "myQueue", "startSequence": "1", "startTime": "1629328511", - "deliverAll": "true", "flowControl": "true", "tls_client_key": "/path/to/tls.key", },