From f4a5722806a48ad1237225a71406a510534edbf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaime=20Pi=C3=B1a?= Date: Wed, 1 Sep 2021 11:50:16 -0700 Subject: [PATCH] Add JetStream support (#1101) * Add JetStream support * Add conformance tests * Fix lint Co-authored-by: Yaron Schneider --- go.mod | 2 +- go.sum | 3 +- pubsub/jetstream/jetstream.go | 154 +++++++++++++++++++++++ pubsub/jetstream/metadata.go | 56 +++++++++ pubsub/jetstream/metadata_test.go | 45 +++++++ tests/config/pubsub/jetstream/pubsub.yml | 14 +++ tests/config/pubsub/tests.yml | 6 +- tests/conformance/common.go | 3 + 8 files changed, 279 insertions(+), 4 deletions(-) create mode 100644 pubsub/jetstream/jetstream.go create mode 100644 pubsub/jetstream/metadata.go create mode 100644 pubsub/jetstream/metadata_test.go create mode 100644 tests/config/pubsub/jetstream/pubsub.yml diff --git a/go.mod b/go.mod index 37c9c2cac..26f443f7a 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/nacos-group/nacos-sdk-go v1.0.8 github.com/nats-io/nats-server/v2 v2.2.1 // indirect github.com/nats-io/nats-streaming-server v0.21.2 // indirect - github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 + github.com/nats-io/nats.go v1.12.0 github.com/nats-io/stan.go v0.8.3 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/nxadm/tail v1.4.8 // indirect diff --git a/go.sum b/go.sum index 0a9f25ac4..47e92758b 100644 --- a/go.sum +++ b/go.sum @@ -831,8 +831,9 @@ github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zE github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk= github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= -github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 h1:z/0dTBxMgMfWOtmpyHrbIDKx2duzrxkUeQYJMUnRPj4= github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g= +github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc= +github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= diff --git a/pubsub/jetstream/jetstream.go b/pubsub/jetstream/jetstream.go new file mode 100644 index 000000000..22887e4a5 --- /dev/null +++ b/pubsub/jetstream/jetstream.go @@ -0,0 +1,154 @@ +package jetstream + +import ( + "context" + "errors" + "time" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" + "github.com/dapr/kit/retry" + "github.com/nats-io/nats.go" +) + +type jetstreamPubSub struct { + nc *nats.Conn + jsc nats.JetStreamContext + l logger.Logger + meta metadata + + ctx context.Context + ctxCancel context.CancelFunc + backOffConfig retry.Config +} + +func NewJetStream(logger logger.Logger) pubsub.PubSub { + return &jetstreamPubSub{l: logger} +} + +func (js *jetstreamPubSub) Init(metadata pubsub.Metadata) error { + var err error + js.meta, err = parseMetadata(metadata) + if err != nil { + return err + } + + var opts []nats.Option + opts = append(opts, nats.Name(js.meta.name)) + + js.nc, err = nats.Connect(js.meta.natsURL, opts...) + if err != nil { + return err + } + js.l.Debugf("Connected to nats at %s", js.meta.natsURL) + + js.jsc, err = js.nc.JetStream() + if err != nil { + return err + } + + js.ctx, js.ctxCancel = context.WithCancel(context.Background()) + + // Default retry configuration is used if no backOff properties are set. + if err := retry.DecodeConfigWithPrefix( + &js.backOffConfig, + metadata.Properties, + "backOff"); err != nil { + return err + } + + js.l.Debug("JetStream initialization complete") + + return nil +} + +func (js *jetstreamPubSub) Features() []pubsub.Feature { + return nil +} + +func (js *jetstreamPubSub) Publish(req *pubsub.PublishRequest) error { + js.l.Debugf("Publishing topic %v with data: %v", req.Topic, req.Data) + _, err := js.jsc.Publish(req.Topic, req.Data) + + return err +} + +func (js *jetstreamPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + var opts []nats.SubOpt + + if v := js.meta.durableName; v != "" { + opts = append(opts, nats.Durable(v)) + } + + if v := js.meta.startTime; !v.IsZero() { + opts = append(opts, nats.StartTime(v)) + } else if v := js.meta.startSequence; v > 0 { + opts = append(opts, nats.StartSequence(v)) + } else if js.meta.deliverAll { + opts = append(opts, nats.DeliverAll()) + } else { + opts = append(opts, nats.DeliverLast()) + } + + if js.meta.flowControl { + opts = append(opts, nats.EnableFlowControl()) + } + + natsHandler := func(m *nats.Msg) { + jsm, err := m.Metadata() + if err != nil { + // If we get an error, then we don't have a valid JetStream + // message. + js.l.Error(err) + + return + } + + operation := func() error { + js.l.Debugf("Processing JetStream message %s/%d", m.Subject, + jsm.Sequence) + opErr := handler(js.ctx, &pubsub.NewMessage{ + Topic: m.Subject, + Data: m.Data, + }) + if opErr != nil { + return opErr + } + + return m.Ack() + } + notify := func(nerr error, d time.Duration) { + js.l.Errorf("Error processing JetStream message: %s/%d. Retrying...", + m.Subject, jsm.Sequence) + } + recovered := func() { + js.l.Infof("Successfully processed JetStream message after it previously failed: %s/%d", + m.Subject, jsm.Sequence) + } + backOff := js.backOffConfig.NewBackOffWithContext(js.ctx) + + err = retry.NotifyRecover(operation, backOff, notify, recovered) + if err != nil && !errors.Is(err, context.Canceled) { + js.l.Errorf("Error processing message and retries are exhausted: %s/%d.", + m.Subject, jsm.Sequence) + } + } + + var err error + if queue := js.meta.queueGroupName; queue != "" { + js.l.Debugf("nats: subscribed to subject %s with queue group %s", + req.Topic, js.meta.queueGroupName) + _, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, opts...) + } else { + js.l.Debugf("nats: subscribed to subject %s", req.Topic) + _, err = js.jsc.Subscribe(req.Topic, natsHandler, opts...) + } + + return err +} + +func (js *jetstreamPubSub) Close() error { + js.ctxCancel() + + return js.nc.Drain() +} diff --git a/pubsub/jetstream/metadata.go b/pubsub/jetstream/metadata.go new file mode 100644 index 000000000..bb10b5496 --- /dev/null +++ b/pubsub/jetstream/metadata.go @@ -0,0 +1,56 @@ +package jetstream + +import ( + "fmt" + "strconv" + "time" + + "github.com/dapr/components-contrib/pubsub" +) + +type metadata struct { + natsURL string + + name string + durableName string + queueGroupName string + startSequence uint64 + startTime time.Time + deliverAll bool + flowControl bool +} + +func parseMetadata(psm pubsub.Metadata) (metadata, error) { + var m metadata + + if v, ok := psm.Properties["natsURL"]; ok && v != "" { + m.natsURL = v + } else { + return metadata{}, fmt.Errorf("missing nats URL") + } + + if m.name = psm.Properties["name"]; m.name == "" { + m.name = "dapr.io - pubsub.jetstream" + } + + m.durableName = psm.Properties["durableName"] + m.queueGroupName = psm.Properties["queueGroupName"] + + if v, err := strconv.ParseUint(psm.Properties["startSequence"], 10, 64); err == nil { + m.startSequence = v + } + + if v, err := strconv.ParseInt(psm.Properties["startTime"], 10, 64); err == nil { + m.startTime = time.Unix(v, 0) + } + + if v, err := strconv.ParseBool(psm.Properties["deliverAll"]); err == nil { + m.deliverAll = v + } + + if v, err := strconv.ParseBool(psm.Properties["flowControl"]); err == nil { + m.flowControl = v + } + + return m, nil +} diff --git a/pubsub/jetstream/metadata_test.go b/pubsub/jetstream/metadata_test.go new file mode 100644 index 000000000..d0cc3d03a --- /dev/null +++ b/pubsub/jetstream/metadata_test.go @@ -0,0 +1,45 @@ +package jetstream + +import ( + "reflect" + "testing" + "time" + + "github.com/dapr/components-contrib/pubsub" +) + +func TestParseMetadata(t *testing.T) { + psm := pubsub.Metadata{ + Properties: map[string]string{ + "natsURL": "nats://localhost:4222", + "name": "myName", + "durableName": "myDurable", + "queueGroupName": "myQueue", + "startSequence": "1", + "startTime": "1629328511", + "deliverAll": "true", + "flowControl": "true", + }, + } + + ts := time.Unix(1629328511, 0) + + want := metadata{ + natsURL: "nats://localhost:4222", + name: "myName", + durableName: "myDurable", + queueGroupName: "myQueue", + startSequence: 1, + startTime: ts, + deliverAll: true, + flowControl: true, + } + + got, err := parseMetadata(psm) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected metadata: got=%v, want=%v", got, want) + } +} diff --git a/tests/config/pubsub/jetstream/pubsub.yml b/tests/config/pubsub/jetstream/pubsub.yml new file mode 100644 index 000000000..631023f9f --- /dev/null +++ b/tests/config/pubsub/jetstream/pubsub.yml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.jetstream + version: v1 + metadata: + - name: natsURL + value: "nats://localhost:4222" + - name: name + value: config-test + - name: flowControl + value: true diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 6e2158fc1..1beec1893 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -1,6 +1,6 @@ -# Supported operation: publish, subscribe +# Supported operation: publish, subscribe # Config map: -## pubsubName : name of the pubsub +## pubsubName : name of the pubsub ## testTopicName: name of the test topic to use ## publish: A map of strings that will be part of the publish metadata in the Publish call ## subscribe: A map of strings that will be part of the subscribe metadata in the Subscribe call @@ -25,6 +25,8 @@ components: checkInOrderProcessing: false - component: natsstreaming allOperations: true + - component: jetstream + allOperations: true - component: kafka allOperations: true - component: pulsar diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 4e7e7acf0..f3f251365 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -38,6 +38,7 @@ import ( p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" p_servicebus "github.com/dapr/components-contrib/pubsub/azure/servicebus" p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast" + p_jetstream "github.com/dapr/components-contrib/pubsub/jetstream" p_kafka "github.com/dapr/components-contrib/pubsub/kafka" p_mqtt "github.com/dapr/components-contrib/pubsub/mqtt" p_natsstreaming "github.com/dapr/components-contrib/pubsub/natsstreaming" @@ -320,6 +321,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { pubsub = p_servicebus.NewAzureServiceBus(testLogger) case "natsstreaming": pubsub = p_natsstreaming.NewNATSStreamingPubSub(testLogger) + case "jetstream": + pubsub = p_jetstream.NewJetStream(testLogger) case kafka: pubsub = p_kafka.NewKafka(testLogger) case "pulsar":