Rework JetStream delivery policy (#2315)

* Rework JetStream delivery policy

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>

* Add missing deliver policies to error

Co-authored-by: Piotr Piotrowski <piotrpiotrowski94@gmail.com>
Signed-off-by: Tomasz Pietrek <melgaer@gmail.com>
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>

* Rearrange imports

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
Signed-off-by: Tomasz Pietrek <melgaer@gmail.com>
Co-authored-by: Piotr Piotrowski <piotrpiotrowski94@gmail.com>
Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
Tomasz Pietrek 2022-12-15 00:56:07 +01:00 committed by GitHub
parent 6a7d58436d
commit 72bd117e7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 22 deletions

View File

@ -126,14 +126,11 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
if v := js.meta.startTime; !v.IsZero() {
consumerConfig.OptStartTime = &v
} else if v := js.meta.startSequence; v > 0 {
consumerConfig.OptStartSeq = v
} else if js.meta.deliverAll {
consumerConfig.DeliverPolicy = nats.DeliverAllPolicy
} else {
consumerConfig.DeliverPolicy = nats.DeliverLastPolicy
}
if v := js.meta.startSequence; v > 0 {
consumerConfig.OptStartSeq = v
}
consumerConfig.DeliverPolicy = js.meta.deliverPolicy
if js.meta.flowControl {
consumerConfig.FlowControl = true
}

View File

@ -40,7 +40,6 @@ type metadata struct {
queueGroupName string
startSequence uint64
startTime time.Time
deliverAll bool
flowControl bool
ackWait time.Duration
maxDeliver int
@ -50,6 +49,7 @@ type metadata struct {
memoryStorage bool
rateLimit uint64
hearbeat time.Duration
deliverPolicy nats.DeliverPolicy
ackPolicy nats.AckPolicy
}
@ -104,10 +104,6 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.startTime = time.Unix(v, 0)
}
if v, err := strconv.ParseBool(psm.Properties["deliverAll"]); err == nil {
m.deliverAll = v
}
if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil {
m.flowControl = v
}
@ -147,6 +143,22 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.hearbeat = v
}
deliverPolicy := psm.Properties["deliverPolicy"]
switch deliverPolicy {
case "all", "":
m.deliverPolicy = nats.DeliverAllPolicy
case "last":
m.deliverPolicy = nats.DeliverLastPolicy
case "new":
m.deliverPolicy = nats.DeliverNewPolicy
case "sequence":
m.deliverPolicy = nats.DeliverByStartSequencePolicy
case "time":
m.deliverPolicy = nats.DeliverByStartTimePolicy
default:
return metadata{}, fmt.Errorf("deliver policy %s is not one of: all, last, new, sequence, time", deliverPolicy)
}
m.streamName = psm.Properties["streamName"]
switch psm.Properties["ackPolicy"] {

View File

@ -41,7 +41,6 @@ func TestParseMetadata(t *testing.T) {
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"ackWait": "2s",
"maxDeliver": "10",
@ -60,7 +59,6 @@ func TestParseMetadata(t *testing.T) {
queueGroupName: "myQueue",
startSequence: 1,
startTime: time.Unix(1629328511, 0),
deliverAll: true,
flowControl: true,
ackWait: 2 * time.Second,
maxDeliver: 10,
@ -70,6 +68,7 @@ func TestParseMetadata(t *testing.T) {
memoryStorage: true,
rateLimit: 20000,
hearbeat: time.Second * 1,
deliverPolicy: nats.DeliverAllPolicy,
ackPolicy: nats.AckExplicitPolicy,
},
expectErr: false,
@ -82,9 +81,7 @@ func TestParseMetadata(t *testing.T) {
"name": "myName",
"durableName": "myDurable",
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"ackWait": "2s",
"maxDeliver": "10",
@ -95,6 +92,8 @@ func TestParseMetadata(t *testing.T) {
"rateLimit": "20000",
"hearbeat": "1s",
"token": "myToken",
"deliverPolicy": "sequence",
"startSequence": "5",
"ackPolicy": "all",
},
}},
@ -103,9 +102,8 @@ func TestParseMetadata(t *testing.T) {
name: "myName",
durableName: "myDurable",
queueGroupName: "myQueue",
startSequence: 1,
startSequence: 5,
startTime: time.Unix(1629328511, 0),
deliverAll: true,
flowControl: true,
ackWait: 2 * time.Second,
maxDeliver: 10,
@ -116,6 +114,7 @@ func TestParseMetadata(t *testing.T) {
rateLimit: 20000,
hearbeat: time.Second * 1,
token: "myToken",
deliverPolicy: nats.DeliverByStartSequencePolicy,
ackPolicy: nats.AckAllPolicy,
},
expectErr: false,
@ -130,7 +129,6 @@ func TestParseMetadata(t *testing.T) {
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"jwt": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c",
},
@ -148,7 +146,6 @@ func TestParseMetadata(t *testing.T) {
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"seedKey": "SUACS34K232OKPRDOMKC6QEWXWUDJTT6R6RZM2WPMURUS5Z3POU7BNIL4Y",
},
@ -166,7 +163,6 @@ func TestParseMetadata(t *testing.T) {
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"tls_client_cert": "/path/to/tls.pem",
},
@ -184,7 +180,6 @@ func TestParseMetadata(t *testing.T) {
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"tls_client_key": "/path/to/tls.key",
},