Add JetStream support (#1101)
* Add JetStream support * Add conformance tests * Fix lint Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
parent
8fd1ee57a0
commit
f4a5722806
2
go.mod
2
go.mod
|
@ -96,7 +96,7 @@ require (
|
|||
github.com/nacos-group/nacos-sdk-go v1.0.8
|
||||
github.com/nats-io/nats-server/v2 v2.2.1 // indirect
|
||||
github.com/nats-io/nats-streaming-server v0.21.2 // indirect
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8
|
||||
github.com/nats-io/nats.go v1.12.0
|
||||
github.com/nats-io/stan.go v0.8.3
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
|
|
3
go.sum
3
go.sum
|
@ -831,8 +831,9 @@ github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zE
|
|||
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 h1:z/0dTBxMgMfWOtmpyHrbIDKx2duzrxkUeQYJMUnRPj4=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g=
|
||||
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
|
||||
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
package jetstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/dapr/kit/retry"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type jetstreamPubSub struct {
|
||||
nc *nats.Conn
|
||||
jsc nats.JetStreamContext
|
||||
l logger.Logger
|
||||
meta metadata
|
||||
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
backOffConfig retry.Config
|
||||
}
|
||||
|
||||
func NewJetStream(logger logger.Logger) pubsub.PubSub {
|
||||
return &jetstreamPubSub{l: logger}
|
||||
}
|
||||
|
||||
func (js *jetstreamPubSub) Init(metadata pubsub.Metadata) error {
|
||||
var err error
|
||||
js.meta, err = parseMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var opts []nats.Option
|
||||
opts = append(opts, nats.Name(js.meta.name))
|
||||
|
||||
js.nc, err = nats.Connect(js.meta.natsURL, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
js.l.Debugf("Connected to nats at %s", js.meta.natsURL)
|
||||
|
||||
js.jsc, err = js.nc.JetStream()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
js.ctx, js.ctxCancel = context.WithCancel(context.Background())
|
||||
|
||||
// Default retry configuration is used if no backOff properties are set.
|
||||
if err := retry.DecodeConfigWithPrefix(
|
||||
&js.backOffConfig,
|
||||
metadata.Properties,
|
||||
"backOff"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
js.l.Debug("JetStream initialization complete")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (js *jetstreamPubSub) Features() []pubsub.Feature {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (js *jetstreamPubSub) Publish(req *pubsub.PublishRequest) error {
|
||||
js.l.Debugf("Publishing topic %v with data: %v", req.Topic, req.Data)
|
||||
_, err := js.jsc.Publish(req.Topic, req.Data)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (js *jetstreamPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
var opts []nats.SubOpt
|
||||
|
||||
if v := js.meta.durableName; v != "" {
|
||||
opts = append(opts, nats.Durable(v))
|
||||
}
|
||||
|
||||
if v := js.meta.startTime; !v.IsZero() {
|
||||
opts = append(opts, nats.StartTime(v))
|
||||
} else if v := js.meta.startSequence; v > 0 {
|
||||
opts = append(opts, nats.StartSequence(v))
|
||||
} else if js.meta.deliverAll {
|
||||
opts = append(opts, nats.DeliverAll())
|
||||
} else {
|
||||
opts = append(opts, nats.DeliverLast())
|
||||
}
|
||||
|
||||
if js.meta.flowControl {
|
||||
opts = append(opts, nats.EnableFlowControl())
|
||||
}
|
||||
|
||||
natsHandler := func(m *nats.Msg) {
|
||||
jsm, err := m.Metadata()
|
||||
if err != nil {
|
||||
// If we get an error, then we don't have a valid JetStream
|
||||
// message.
|
||||
js.l.Error(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
operation := func() error {
|
||||
js.l.Debugf("Processing JetStream message %s/%d", m.Subject,
|
||||
jsm.Sequence)
|
||||
opErr := handler(js.ctx, &pubsub.NewMessage{
|
||||
Topic: m.Subject,
|
||||
Data: m.Data,
|
||||
})
|
||||
if opErr != nil {
|
||||
return opErr
|
||||
}
|
||||
|
||||
return m.Ack()
|
||||
}
|
||||
notify := func(nerr error, d time.Duration) {
|
||||
js.l.Errorf("Error processing JetStream message: %s/%d. Retrying...",
|
||||
m.Subject, jsm.Sequence)
|
||||
}
|
||||
recovered := func() {
|
||||
js.l.Infof("Successfully processed JetStream message after it previously failed: %s/%d",
|
||||
m.Subject, jsm.Sequence)
|
||||
}
|
||||
backOff := js.backOffConfig.NewBackOffWithContext(js.ctx)
|
||||
|
||||
err = retry.NotifyRecover(operation, backOff, notify, recovered)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
js.l.Errorf("Error processing message and retries are exhausted: %s/%d.",
|
||||
m.Subject, jsm.Sequence)
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
if queue := js.meta.queueGroupName; queue != "" {
|
||||
js.l.Debugf("nats: subscribed to subject %s with queue group %s",
|
||||
req.Topic, js.meta.queueGroupName)
|
||||
_, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, opts...)
|
||||
} else {
|
||||
js.l.Debugf("nats: subscribed to subject %s", req.Topic)
|
||||
_, err = js.jsc.Subscribe(req.Topic, natsHandler, opts...)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (js *jetstreamPubSub) Close() error {
|
||||
js.ctxCancel()
|
||||
|
||||
return js.nc.Drain()
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
package jetstream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
type metadata struct {
|
||||
natsURL string
|
||||
|
||||
name string
|
||||
durableName string
|
||||
queueGroupName string
|
||||
startSequence uint64
|
||||
startTime time.Time
|
||||
deliverAll bool
|
||||
flowControl bool
|
||||
}
|
||||
|
||||
func parseMetadata(psm pubsub.Metadata) (metadata, error) {
|
||||
var m metadata
|
||||
|
||||
if v, ok := psm.Properties["natsURL"]; ok && v != "" {
|
||||
m.natsURL = v
|
||||
} else {
|
||||
return metadata{}, fmt.Errorf("missing nats URL")
|
||||
}
|
||||
|
||||
if m.name = psm.Properties["name"]; m.name == "" {
|
||||
m.name = "dapr.io - pubsub.jetstream"
|
||||
}
|
||||
|
||||
m.durableName = psm.Properties["durableName"]
|
||||
m.queueGroupName = psm.Properties["queueGroupName"]
|
||||
|
||||
if v, err := strconv.ParseUint(psm.Properties["startSequence"], 10, 64); err == nil {
|
||||
m.startSequence = v
|
||||
}
|
||||
|
||||
if v, err := strconv.ParseInt(psm.Properties["startTime"], 10, 64); err == nil {
|
||||
m.startTime = time.Unix(v, 0)
|
||||
}
|
||||
|
||||
if v, err := strconv.ParseBool(psm.Properties["deliverAll"]); err == nil {
|
||||
m.deliverAll = v
|
||||
}
|
||||
|
||||
if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil {
|
||||
m.flowControl = v
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package jetstream
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
func TestParseMetadata(t *testing.T) {
|
||||
psm := pubsub.Metadata{
|
||||
Properties: map[string]string{
|
||||
"natsURL": "nats://localhost:4222",
|
||||
"name": "myName",
|
||||
"durableName": "myDurable",
|
||||
"queueGroupName": "myQueue",
|
||||
"startSequence": "1",
|
||||
"startTime": "1629328511",
|
||||
"deliverAll": "true",
|
||||
"flowControl": "true",
|
||||
},
|
||||
}
|
||||
|
||||
ts := time.Unix(1629328511, 0)
|
||||
|
||||
want := metadata{
|
||||
natsURL: "nats://localhost:4222",
|
||||
name: "myName",
|
||||
durableName: "myDurable",
|
||||
queueGroupName: "myQueue",
|
||||
startSequence: 1,
|
||||
startTime: ts,
|
||||
deliverAll: true,
|
||||
flowControl: true,
|
||||
}
|
||||
|
||||
got, err := parseMetadata(psm)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("unexpected metadata: got=%v, want=%v", got, want)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: pubsub
|
||||
spec:
|
||||
type: pubsub.jetstream
|
||||
version: v1
|
||||
metadata:
|
||||
- name: natsURL
|
||||
value: "nats://localhost:4222"
|
||||
- name: name
|
||||
value: config-test
|
||||
- name: flowControl
|
||||
value: true
|
|
@ -1,6 +1,6 @@
|
|||
# Supported operation: publish, subscribe
|
||||
# Supported operation: publish, subscribe
|
||||
# Config map:
|
||||
## pubsubName : name of the pubsub
|
||||
## pubsubName : name of the pubsub
|
||||
## testTopicName: name of the test topic to use
|
||||
## publish: A map of strings that will be part of the publish metadata in the Publish call
|
||||
## subscribe: A map of strings that will be part of the subscribe metadata in the Subscribe call
|
||||
|
@ -25,6 +25,8 @@ components:
|
|||
checkInOrderProcessing: false
|
||||
- component: natsstreaming
|
||||
allOperations: true
|
||||
- component: jetstream
|
||||
allOperations: true
|
||||
- component: kafka
|
||||
allOperations: true
|
||||
- component: pulsar
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs"
|
||||
p_servicebus "github.com/dapr/components-contrib/pubsub/azure/servicebus"
|
||||
p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast"
|
||||
p_jetstream "github.com/dapr/components-contrib/pubsub/jetstream"
|
||||
p_kafka "github.com/dapr/components-contrib/pubsub/kafka"
|
||||
p_mqtt "github.com/dapr/components-contrib/pubsub/mqtt"
|
||||
p_natsstreaming "github.com/dapr/components-contrib/pubsub/natsstreaming"
|
||||
|
@ -320,6 +321,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
|
|||
pubsub = p_servicebus.NewAzureServiceBus(testLogger)
|
||||
case "natsstreaming":
|
||||
pubsub = p_natsstreaming.NewNATSStreamingPubSub(testLogger)
|
||||
case "jetstream":
|
||||
pubsub = p_jetstream.NewJetStream(testLogger)
|
||||
case kafka:
|
||||
pubsub = p_kafka.NewKafka(testLogger)
|
||||
case "pulsar":
|
||||
|
|
Loading…
Reference in New Issue