Merge branch 'master' into upgradesdks
This commit is contained in:
commit
08125153c1
|
|
@ -126,14 +126,11 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
|
||||||
|
|
||||||
if v := js.meta.startTime; !v.IsZero() {
|
if v := js.meta.startTime; !v.IsZero() {
|
||||||
consumerConfig.OptStartTime = &v
|
consumerConfig.OptStartTime = &v
|
||||||
} else if v := js.meta.startSequence; v > 0 {
|
|
||||||
consumerConfig.OptStartSeq = v
|
|
||||||
} else if js.meta.deliverAll {
|
|
||||||
consumerConfig.DeliverPolicy = nats.DeliverAllPolicy
|
|
||||||
} else {
|
|
||||||
consumerConfig.DeliverPolicy = nats.DeliverLastPolicy
|
|
||||||
}
|
}
|
||||||
|
if v := js.meta.startSequence; v > 0 {
|
||||||
|
consumerConfig.OptStartSeq = v
|
||||||
|
}
|
||||||
|
consumerConfig.DeliverPolicy = js.meta.deliverPolicy
|
||||||
if js.meta.flowControl {
|
if js.meta.flowControl {
|
||||||
consumerConfig.FlowControl = true
|
consumerConfig.FlowControl = true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,6 @@ type metadata struct {
|
||||||
queueGroupName string
|
queueGroupName string
|
||||||
startSequence uint64
|
startSequence uint64
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
deliverAll bool
|
|
||||||
flowControl bool
|
flowControl bool
|
||||||
ackWait time.Duration
|
ackWait time.Duration
|
||||||
maxDeliver int
|
maxDeliver int
|
||||||
|
|
@ -50,6 +49,7 @@ type metadata struct {
|
||||||
memoryStorage bool
|
memoryStorage bool
|
||||||
rateLimit uint64
|
rateLimit uint64
|
||||||
hearbeat time.Duration
|
hearbeat time.Duration
|
||||||
|
deliverPolicy nats.DeliverPolicy
|
||||||
ackPolicy nats.AckPolicy
|
ackPolicy nats.AckPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -104,10 +104,6 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
|
||||||
m.startTime = time.Unix(v, 0)
|
m.startTime = time.Unix(v, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v, err := strconv.ParseBool(psm.Properties["deliverAll"]); err == nil {
|
|
||||||
m.deliverAll = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil {
|
if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil {
|
||||||
m.flowControl = v
|
m.flowControl = v
|
||||||
}
|
}
|
||||||
|
|
@ -147,6 +143,22 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
|
||||||
m.hearbeat = v
|
m.hearbeat = v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deliverPolicy := psm.Properties["deliverPolicy"]
|
||||||
|
switch deliverPolicy {
|
||||||
|
case "all", "":
|
||||||
|
m.deliverPolicy = nats.DeliverAllPolicy
|
||||||
|
case "last":
|
||||||
|
m.deliverPolicy = nats.DeliverLastPolicy
|
||||||
|
case "new":
|
||||||
|
m.deliverPolicy = nats.DeliverNewPolicy
|
||||||
|
case "sequence":
|
||||||
|
m.deliverPolicy = nats.DeliverByStartSequencePolicy
|
||||||
|
case "time":
|
||||||
|
m.deliverPolicy = nats.DeliverByStartTimePolicy
|
||||||
|
default:
|
||||||
|
return metadata{}, fmt.Errorf("deliver policy %s is not one of: all, last, new, sequence, time", deliverPolicy)
|
||||||
|
}
|
||||||
|
|
||||||
m.streamName = psm.Properties["streamName"]
|
m.streamName = psm.Properties["streamName"]
|
||||||
|
|
||||||
switch psm.Properties["ackPolicy"] {
|
switch psm.Properties["ackPolicy"] {
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,6 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"queueGroupName": "myQueue",
|
"queueGroupName": "myQueue",
|
||||||
"startSequence": "1",
|
"startSequence": "1",
|
||||||
"startTime": "1629328511",
|
"startTime": "1629328511",
|
||||||
"deliverAll": "true",
|
|
||||||
"flowControl": "true",
|
"flowControl": "true",
|
||||||
"ackWait": "2s",
|
"ackWait": "2s",
|
||||||
"maxDeliver": "10",
|
"maxDeliver": "10",
|
||||||
|
|
@ -60,7 +59,6 @@ func TestParseMetadata(t *testing.T) {
|
||||||
queueGroupName: "myQueue",
|
queueGroupName: "myQueue",
|
||||||
startSequence: 1,
|
startSequence: 1,
|
||||||
startTime: time.Unix(1629328511, 0),
|
startTime: time.Unix(1629328511, 0),
|
||||||
deliverAll: true,
|
|
||||||
flowControl: true,
|
flowControl: true,
|
||||||
ackWait: 2 * time.Second,
|
ackWait: 2 * time.Second,
|
||||||
maxDeliver: 10,
|
maxDeliver: 10,
|
||||||
|
|
@ -70,6 +68,7 @@ func TestParseMetadata(t *testing.T) {
|
||||||
memoryStorage: true,
|
memoryStorage: true,
|
||||||
rateLimit: 20000,
|
rateLimit: 20000,
|
||||||
hearbeat: time.Second * 1,
|
hearbeat: time.Second * 1,
|
||||||
|
deliverPolicy: nats.DeliverAllPolicy,
|
||||||
ackPolicy: nats.AckExplicitPolicy,
|
ackPolicy: nats.AckExplicitPolicy,
|
||||||
},
|
},
|
||||||
expectErr: false,
|
expectErr: false,
|
||||||
|
|
@ -82,9 +81,7 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"name": "myName",
|
"name": "myName",
|
||||||
"durableName": "myDurable",
|
"durableName": "myDurable",
|
||||||
"queueGroupName": "myQueue",
|
"queueGroupName": "myQueue",
|
||||||
"startSequence": "1",
|
|
||||||
"startTime": "1629328511",
|
"startTime": "1629328511",
|
||||||
"deliverAll": "true",
|
|
||||||
"flowControl": "true",
|
"flowControl": "true",
|
||||||
"ackWait": "2s",
|
"ackWait": "2s",
|
||||||
"maxDeliver": "10",
|
"maxDeliver": "10",
|
||||||
|
|
@ -95,6 +92,8 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"rateLimit": "20000",
|
"rateLimit": "20000",
|
||||||
"hearbeat": "1s",
|
"hearbeat": "1s",
|
||||||
"token": "myToken",
|
"token": "myToken",
|
||||||
|
"deliverPolicy": "sequence",
|
||||||
|
"startSequence": "5",
|
||||||
"ackPolicy": "all",
|
"ackPolicy": "all",
|
||||||
},
|
},
|
||||||
}},
|
}},
|
||||||
|
|
@ -103,9 +102,8 @@ func TestParseMetadata(t *testing.T) {
|
||||||
name: "myName",
|
name: "myName",
|
||||||
durableName: "myDurable",
|
durableName: "myDurable",
|
||||||
queueGroupName: "myQueue",
|
queueGroupName: "myQueue",
|
||||||
startSequence: 1,
|
startSequence: 5,
|
||||||
startTime: time.Unix(1629328511, 0),
|
startTime: time.Unix(1629328511, 0),
|
||||||
deliverAll: true,
|
|
||||||
flowControl: true,
|
flowControl: true,
|
||||||
ackWait: 2 * time.Second,
|
ackWait: 2 * time.Second,
|
||||||
maxDeliver: 10,
|
maxDeliver: 10,
|
||||||
|
|
@ -116,6 +114,7 @@ func TestParseMetadata(t *testing.T) {
|
||||||
rateLimit: 20000,
|
rateLimit: 20000,
|
||||||
hearbeat: time.Second * 1,
|
hearbeat: time.Second * 1,
|
||||||
token: "myToken",
|
token: "myToken",
|
||||||
|
deliverPolicy: nats.DeliverByStartSequencePolicy,
|
||||||
ackPolicy: nats.AckAllPolicy,
|
ackPolicy: nats.AckAllPolicy,
|
||||||
},
|
},
|
||||||
expectErr: false,
|
expectErr: false,
|
||||||
|
|
@ -130,7 +129,6 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"queueGroupName": "myQueue",
|
"queueGroupName": "myQueue",
|
||||||
"startSequence": "1",
|
"startSequence": "1",
|
||||||
"startTime": "1629328511",
|
"startTime": "1629328511",
|
||||||
"deliverAll": "true",
|
|
||||||
"flowControl": "true",
|
"flowControl": "true",
|
||||||
"jwt": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c",
|
"jwt": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c",
|
||||||
},
|
},
|
||||||
|
|
@ -148,7 +146,6 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"queueGroupName": "myQueue",
|
"queueGroupName": "myQueue",
|
||||||
"startSequence": "1",
|
"startSequence": "1",
|
||||||
"startTime": "1629328511",
|
"startTime": "1629328511",
|
||||||
"deliverAll": "true",
|
|
||||||
"flowControl": "true",
|
"flowControl": "true",
|
||||||
"seedKey": "SUACS34K232OKPRDOMKC6QEWXWUDJTT6R6RZM2WPMURUS5Z3POU7BNIL4Y",
|
"seedKey": "SUACS34K232OKPRDOMKC6QEWXWUDJTT6R6RZM2WPMURUS5Z3POU7BNIL4Y",
|
||||||
},
|
},
|
||||||
|
|
@ -166,7 +163,6 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"queueGroupName": "myQueue",
|
"queueGroupName": "myQueue",
|
||||||
"startSequence": "1",
|
"startSequence": "1",
|
||||||
"startTime": "1629328511",
|
"startTime": "1629328511",
|
||||||
"deliverAll": "true",
|
|
||||||
"flowControl": "true",
|
"flowControl": "true",
|
||||||
"tls_client_cert": "/path/to/tls.pem",
|
"tls_client_cert": "/path/to/tls.pem",
|
||||||
},
|
},
|
||||||
|
|
@ -184,7 +180,6 @@ func TestParseMetadata(t *testing.T) {
|
||||||
"queueGroupName": "myQueue",
|
"queueGroupName": "myQueue",
|
||||||
"startSequence": "1",
|
"startSequence": "1",
|
||||||
"startTime": "1629328511",
|
"startTime": "1629328511",
|
||||||
"deliverAll": "true",
|
|
||||||
"flowControl": "true",
|
"flowControl": "true",
|
||||||
"tls_client_key": "/path/to/tls.key",
|
"tls_client_key": "/path/to/tls.key",
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue