Add schema support to apache pulsar (#2475)
* add schema support to apache pulsar Signed-off-by: yaron2 <schneider.yaron@live.com> * linter and avro support Signed-off-by: yaron2 <schneider.yaron@live.com> * fix nit Signed-off-by: yaron2 <schneider.yaron@live.com> * fix go.mod Signed-off-by: yaron2 <schneider.yaron@live.com> * update cert test go.mod Signed-off-by: yaron2 <schneider.yaron@live.com> * update cert test metadata to latest Signed-off-by: yaron2 <schneider.yaron@live.com> * schema identifier refactor to handle edge case Signed-off-by: yaron2 <schneider.yaron@live.com> * fix shadow Signed-off-by: yaron2 <schneider.yaron@live.com> Signed-off-by: yaron2 <schneider.yaron@live.com>
This commit is contained in:
parent
d0eed92986
commit
1dc20e6c69
1
go.mod
1
go.mod
|
@ -67,6 +67,7 @@ require (
|
||||||
github.com/googleapis/gax-go/v2 v2.7.0
|
github.com/googleapis/gax-go/v2 v2.7.0
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/grandcat/zeroconf v1.0.0
|
github.com/grandcat/zeroconf v1.0.0
|
||||||
|
github.com/hamba/avro/v2 v2.4.0
|
||||||
github.com/hashicorp/consul/api v1.13.0
|
github.com/hashicorp/consul/api v1.13.0
|
||||||
github.com/hashicorp/go-multierror v1.1.1
|
github.com/hashicorp/go-multierror v1.1.1
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.1
|
github.com/hashicorp/golang-lru/v2 v2.0.1
|
||||||
|
|
3
go.sum
3
go.sum
|
@ -794,6 +794,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
|
||||||
github.com/envoyproxy/go-control-plane v0.10.0/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
|
github.com/envoyproxy/go-control-plane v0.10.0/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
|
||||||
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
|
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
|
||||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||||
|
github.com/ettle/strcase v0.1.1/go.mod h1:hzDLsPC7/lwKyBOywSHEP89nt2pDgdy+No1NBA9o9VY=
|
||||||
github.com/evanphx/json-patch/v5 v5.5.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
|
github.com/evanphx/json-patch/v5 v5.5.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
|
||||||
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
|
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
|
||||||
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
|
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
|
||||||
|
@ -1095,6 +1096,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
|
||||||
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
|
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
|
||||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
|
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
|
||||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
|
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
|
||||||
|
github.com/hamba/avro/v2 v2.4.0 h1:w/XucdXkKCc2Bna8Ra9MK1KubaLEOnk4vcTVfXP2AKw=
|
||||||
|
github.com/hamba/avro/v2 v2.4.0/go.mod h1:6MapKiXjILKSuR/z7SMwkihv2f//wahd/l2bUDHHqI4=
|
||||||
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
|
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
|
||||||
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
|
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
|
||||||
github.com/hashicorp/consul/api v1.13.0 h1:2hnLQ0GjQvw7f3O61jMO8gbasZviZTrt9R8WzgiirHc=
|
github.com/hashicorp/consul/api v1.13.0 h1:2hnLQ0GjQvw7f3O61jMO8gbasZviZTrt9R8WzgiirHc=
|
||||||
|
|
|
@ -28,4 +28,10 @@ type pulsarMetadata struct {
|
||||||
Persistent bool `json:"persistent"`
|
Persistent bool `json:"persistent"`
|
||||||
Token string `json:"token"`
|
Token string `json:"token"`
|
||||||
RedeliveryDelay time.Duration `json:"redeliveryDelay"`
|
RedeliveryDelay time.Duration `json:"redeliveryDelay"`
|
||||||
|
topicSchemas map[string]schemaMetadata
|
||||||
|
}
|
||||||
|
|
||||||
|
type schemaMetadata struct {
|
||||||
|
protocol string
|
||||||
|
value string
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,12 +15,15 @@ package pulsar
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hamba/avro/v2"
|
||||||
|
|
||||||
"github.com/apache/pulsar-client-go/pulsar"
|
"github.com/apache/pulsar-client-go/pulsar"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
|
||||||
|
@ -42,6 +45,8 @@ const (
|
||||||
namespace = "namespace"
|
namespace = "namespace"
|
||||||
persistent = "persistent"
|
persistent = "persistent"
|
||||||
redeliveryDelay = "redeliveryDelay"
|
redeliveryDelay = "redeliveryDelay"
|
||||||
|
avroProtocol = "avro"
|
||||||
|
jsonProtocol = "json"
|
||||||
|
|
||||||
defaultTenant = "public"
|
defaultTenant = "public"
|
||||||
defaultNamespace = "default"
|
defaultNamespace = "default"
|
||||||
|
@ -50,9 +55,11 @@ const (
|
||||||
pulsarToken = "token"
|
pulsarToken = "token"
|
||||||
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
|
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
|
||||||
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
|
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
|
||||||
topicFormat = "%s://%s/%s/%s"
|
topicFormat = "%s://%s/%s/%s"
|
||||||
persistentStr = "persistent"
|
persistentStr = "persistent"
|
||||||
nonPersistentStr = "non-persistent"
|
nonPersistentStr = "non-persistent"
|
||||||
|
topicJSONSchemaIdentifier = ".jsonschema"
|
||||||
|
topicAvroSchemaIdentifier = ".avroschema"
|
||||||
|
|
||||||
// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages.
|
// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages.
|
||||||
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
|
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
|
||||||
|
@ -76,7 +83,7 @@ func NewPulsar(l logger.Logger) pubsub.PubSub {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
||||||
m := pulsarMetadata{Persistent: true, Tenant: defaultTenant, Namespace: defaultNamespace}
|
m := pulsarMetadata{Persistent: true, Tenant: defaultTenant, Namespace: defaultNamespace, topicSchemas: map[string]schemaMetadata{}}
|
||||||
m.ConsumerID = meta.Properties[consumerID]
|
m.ConsumerID = meta.Properties[consumerID]
|
||||||
|
|
||||||
if val, ok := meta.Properties[host]; ok && val != "" {
|
if val, ok := meta.Properties[host]; ok && val != "" {
|
||||||
|
@ -149,6 +156,22 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
||||||
m.Token = val
|
m.Token = val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for k, v := range meta.Properties {
|
||||||
|
if strings.HasSuffix(k, topicJSONSchemaIdentifier) {
|
||||||
|
topic := k[:len(k)-len(topicJSONSchemaIdentifier)]
|
||||||
|
m.topicSchemas[topic] = schemaMetadata{
|
||||||
|
protocol: jsonProtocol,
|
||||||
|
value: v,
|
||||||
|
}
|
||||||
|
} else if strings.HasSuffix(k, topicAvroSchemaIdentifier) {
|
||||||
|
topic := k[:len(k)-len(topicJSONSchemaIdentifier)]
|
||||||
|
m.topicSchemas[topic] = schemaMetadata{
|
||||||
|
protocol: avroProtocol,
|
||||||
|
value: v,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &m, nil
|
return &m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,15 +225,24 @@ func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error
|
||||||
)
|
)
|
||||||
topic := p.formatTopic(req.Topic)
|
topic := p.formatTopic(req.Topic)
|
||||||
producer, ok := p.cache.Get(topic)
|
producer, ok := p.cache.Get(topic)
|
||||||
|
|
||||||
|
sm, hasSchema := p.metadata.topicSchemas[req.Topic]
|
||||||
|
|
||||||
if !ok || producer == nil {
|
if !ok || producer == nil {
|
||||||
p.logger.Debugf("creating producer for topic %s, full topic name in pulsar is %s", req.Topic, topic)
|
p.logger.Debugf("creating producer for topic %s, full topic name in pulsar is %s", req.Topic, topic)
|
||||||
producer, err = p.client.CreateProducer(pulsar.ProducerOptions{
|
opts := pulsar.ProducerOptions{
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
DisableBatching: p.metadata.DisableBatching,
|
DisableBatching: p.metadata.DisableBatching,
|
||||||
BatchingMaxPublishDelay: p.metadata.BatchingMaxPublishDelay,
|
BatchingMaxPublishDelay: p.metadata.BatchingMaxPublishDelay,
|
||||||
BatchingMaxMessages: p.metadata.BatchingMaxMessages,
|
BatchingMaxMessages: p.metadata.BatchingMaxMessages,
|
||||||
BatchingMaxSize: p.metadata.BatchingMaxSize,
|
BatchingMaxSize: p.metadata.BatchingMaxSize,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
if hasSchema {
|
||||||
|
opts.Schema = getPulsarSchema(sm)
|
||||||
|
}
|
||||||
|
|
||||||
|
producer, err = p.client.CreateProducer(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -218,7 +250,7 @@ func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error
|
||||||
p.cache.Add(topic, producer)
|
p.cache.Add(topic, producer)
|
||||||
}
|
}
|
||||||
|
|
||||||
msg, err = parsePublishMetadata(req)
|
msg, err = parsePublishMetadata(req, sm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -229,13 +261,51 @@ func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPulsarSchema(metadata schemaMetadata) pulsar.Schema {
|
||||||
|
switch metadata.protocol {
|
||||||
|
case jsonProtocol:
|
||||||
|
return pulsar.NewJSONSchema(metadata.value, nil)
|
||||||
|
case avroProtocol:
|
||||||
|
return pulsar.NewAvroSchema(metadata.value, nil)
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// parsePublishMetadata parse publish metadata.
|
// parsePublishMetadata parse publish metadata.
|
||||||
func parsePublishMetadata(req *pubsub.PublishRequest) (
|
func parsePublishMetadata(req *pubsub.PublishRequest, schema schemaMetadata) (
|
||||||
msg *pulsar.ProducerMessage, err error,
|
msg *pulsar.ProducerMessage, err error,
|
||||||
) {
|
) {
|
||||||
msg = &pulsar.ProducerMessage{
|
msg = &pulsar.ProducerMessage{}
|
||||||
Payload: req.Data,
|
|
||||||
|
switch schema.protocol {
|
||||||
|
case "":
|
||||||
|
msg.Payload = req.Data
|
||||||
|
case jsonProtocol:
|
||||||
|
var obj interface{}
|
||||||
|
err = json.Unmarshal(req.Data, &obj)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.Value = obj
|
||||||
|
case avroProtocol:
|
||||||
|
var obj interface{}
|
||||||
|
avroSchema, parseErr := avro.Parse(schema.value)
|
||||||
|
if parseErr != nil {
|
||||||
|
return nil, parseErr
|
||||||
|
}
|
||||||
|
|
||||||
|
err = avro.Unmarshal(avroSchema, req.Data, &obj)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.Value = obj
|
||||||
}
|
}
|
||||||
|
|
||||||
if val, ok := req.Metadata[deliverAt]; ok {
|
if val, ok := req.Metadata[deliverAt]; ok {
|
||||||
msg.DeliverAt, err = time.Parse(time.RFC3339, val)
|
msg.DeliverAt, err = time.Parse(time.RFC3339, val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -249,13 +319,14 @@ func parsePublishMetadata(req *pubsub.PublishRequest) (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||||
channel := make(chan pulsar.ConsumerMessage, 100)
|
channel := make(chan pulsar.ConsumerMessage, 100)
|
||||||
|
|
||||||
topic := p.formatTopic(req.Topic)
|
topic := p.formatTopic(req.Topic)
|
||||||
|
|
||||||
options := pulsar.ConsumerOptions{
|
options := pulsar.ConsumerOptions{
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
SubscriptionName: p.metadata.ConsumerID,
|
SubscriptionName: p.metadata.ConsumerID,
|
||||||
|
@ -264,6 +335,9 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
|
||||||
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
|
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sm, ok := p.metadata.topicSchemas[req.Topic]; ok {
|
||||||
|
options.Schema = getPulsarSchema(sm)
|
||||||
|
}
|
||||||
consumer, err := p.client.Subscribe(options)
|
consumer, err := p.client.Subscribe(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.logger.Debugf("Could not subscribe to %s, full topic name in pulsar is %s", req.Topic, topic)
|
p.logger.Debugf("Could not subscribe to %s, full topic name in pulsar is %s", req.Topic, topic)
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/apache/pulsar-client-go/pulsar"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
|
@ -43,6 +44,93 @@ func TestParsePulsarMetadata(t *testing.T) {
|
||||||
assert.Equal(t, 5*time.Second, meta.BatchingMaxPublishDelay)
|
assert.Equal(t, 5*time.Second, meta.BatchingMaxPublishDelay)
|
||||||
assert.Equal(t, uint(100), meta.BatchingMaxSize)
|
assert.Equal(t, uint(100), meta.BatchingMaxSize)
|
||||||
assert.Equal(t, uint(200), meta.BatchingMaxMessages)
|
assert.Equal(t, uint(200), meta.BatchingMaxMessages)
|
||||||
|
assert.Empty(t, meta.topicSchemas)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePulsarSchemaMetadata(t *testing.T) {
|
||||||
|
t.Run("test json", func(t *testing.T) {
|
||||||
|
m := pubsub.Metadata{}
|
||||||
|
m.Properties = map[string]string{
|
||||||
|
"host": "a",
|
||||||
|
"obiwan.jsonschema": "1",
|
||||||
|
"kenobi.jsonschema.jsonschema": "2",
|
||||||
|
}
|
||||||
|
meta, err := parsePulsarMetadata(m)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, "a", meta.Host)
|
||||||
|
assert.Len(t, meta.topicSchemas, 2)
|
||||||
|
assert.Equal(t, "1", meta.topicSchemas["obiwan"].value)
|
||||||
|
assert.Equal(t, "2", meta.topicSchemas["kenobi.jsonschema"].value)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("test avro", func(t *testing.T) {
|
||||||
|
m := pubsub.Metadata{}
|
||||||
|
m.Properties = map[string]string{
|
||||||
|
"host": "a",
|
||||||
|
"obiwan.avroschema": "1",
|
||||||
|
"kenobi.avroschema.avroschema": "2",
|
||||||
|
}
|
||||||
|
meta, err := parsePulsarMetadata(m)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, "a", meta.Host)
|
||||||
|
assert.Len(t, meta.topicSchemas, 2)
|
||||||
|
assert.Equal(t, "1", meta.topicSchemas["obiwan"].value)
|
||||||
|
assert.Equal(t, "2", meta.topicSchemas["kenobi.avroschema"].value)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("test combined avro/json", func(t *testing.T) {
|
||||||
|
m := pubsub.Metadata{}
|
||||||
|
m.Properties = map[string]string{
|
||||||
|
"host": "a",
|
||||||
|
"obiwan.avroschema": "1",
|
||||||
|
"kenobi.jsonschema": "2",
|
||||||
|
}
|
||||||
|
meta, err := parsePulsarMetadata(m)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, "a", meta.Host)
|
||||||
|
assert.Len(t, meta.topicSchemas, 2)
|
||||||
|
assert.Equal(t, "1", meta.topicSchemas["obiwan"].value)
|
||||||
|
assert.Equal(t, "2", meta.topicSchemas["kenobi"].value)
|
||||||
|
assert.Equal(t, avroProtocol, meta.topicSchemas["obiwan"].protocol)
|
||||||
|
assert.Equal(t, jsonProtocol, meta.topicSchemas["kenobi"].protocol)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("test funky edge case", func(t *testing.T) {
|
||||||
|
m := pubsub.Metadata{}
|
||||||
|
m.Properties = map[string]string{
|
||||||
|
"host": "a",
|
||||||
|
"obiwan.jsonschema.avroschema": "1",
|
||||||
|
}
|
||||||
|
meta, err := parsePulsarMetadata(m)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, "a", meta.Host)
|
||||||
|
assert.Len(t, meta.topicSchemas, 1)
|
||||||
|
assert.Equal(t, "1", meta.topicSchemas["obiwan.jsonschema"].value)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPulsarSchema(t *testing.T) {
|
||||||
|
t.Run("json schema", func(t *testing.T) {
|
||||||
|
s := getPulsarSchema(schemaMetadata{
|
||||||
|
protocol: "json",
|
||||||
|
value: "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
|
||||||
|
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
|
||||||
|
})
|
||||||
|
assert.IsType(t, &pulsar.JSONSchema{}, s)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("avro schema", func(t *testing.T) {
|
||||||
|
s := getPulsarSchema(schemaMetadata{
|
||||||
|
protocol: "avro",
|
||||||
|
value: "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
|
||||||
|
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
|
||||||
|
})
|
||||||
|
assert.IsType(t, &pulsar.AvroSchema{}, s)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParsePublishMetadata(t *testing.T) {
|
func TestParsePublishMetadata(t *testing.T) {
|
||||||
|
@ -51,7 +139,7 @@ func TestParsePublishMetadata(t *testing.T) {
|
||||||
"deliverAt": "2021-08-31T11:45:02Z",
|
"deliverAt": "2021-08-31T11:45:02Z",
|
||||||
"deliverAfter": "60s",
|
"deliverAfter": "60s",
|
||||||
}
|
}
|
||||||
msg, err := parsePublishMetadata(m)
|
msg, err := parsePublishMetadata(m, schemaMetadata{})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
val, _ := time.ParseDuration("60s")
|
val, _ := time.ParseDuration("60s")
|
||||||
|
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
||||||
package watcher
|
package watcher
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -319,6 +320,29 @@ func (w *Watcher) Observe(data ...interface{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ObserveJSON adds any json data that is in `remaining` to
|
||||||
|
// the `observed` slice. If the the watcher is closable
|
||||||
|
// (all expected data captured) and there is no more
|
||||||
|
// remaining data to observe, then the finish channel
|
||||||
|
// is closed.
|
||||||
|
func (w *Watcher) ObserveJSON(data ...interface{}) {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
for _, item := range data {
|
||||||
|
b, _ := json.Marshal(&item)
|
||||||
|
str := string(b)
|
||||||
|
if _, ok := w.remaining[str]; ok {
|
||||||
|
w.observed = append(w.observed, str)
|
||||||
|
delete(w.remaining, str)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.closable && len(w.remaining) == 0 {
|
||||||
|
w.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WaitForResult waits for up to `timeout` for all
|
// WaitForResult waits for up to `timeout` for all
|
||||||
// expected data to be observed and returns an error
|
// expected data to be observed and returns an error
|
||||||
// if expected and observed data differ.
|
// if expected and observed data differ.
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
apiVersion: dapr.io/v1alpha1
|
||||||
|
kind: Component
|
||||||
|
metadata:
|
||||||
|
name: messagebus
|
||||||
|
spec:
|
||||||
|
type: pubsub.pulsar
|
||||||
|
version: v1
|
||||||
|
metadata:
|
||||||
|
- name: host
|
||||||
|
value: "localhost:6650"
|
||||||
|
- name: consumerID
|
||||||
|
value: certification4
|
||||||
|
- name: redeliveryDelay
|
||||||
|
value: 200ms
|
||||||
|
- name: certification-pubsub-topic-active.jsonschema
|
||||||
|
value: "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\",\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
|
|
@ -67,6 +67,7 @@ require (
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
|
||||||
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
|
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
|
||||||
|
github.com/hamba/avro/v2 v2.4.0 // indirect
|
||||||
github.com/hashicorp/consul/api v1.18.0 // indirect
|
github.com/hashicorp/consul/api v1.18.0 // indirect
|
||||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||||
|
|
|
@ -141,6 +141,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m
|
||||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
|
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
|
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
|
||||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||||
|
github.com/ettle/strcase v0.1.1/go.mod h1:hzDLsPC7/lwKyBOywSHEP89nt2pDgdy+No1NBA9o9VY=
|
||||||
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
|
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
|
||||||
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
|
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
|
||||||
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
|
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
|
||||||
|
@ -298,6 +299,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QG
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
|
||||||
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
|
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
|
||||||
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
|
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
|
||||||
|
github.com/hamba/avro/v2 v2.4.0 h1:w/XucdXkKCc2Bna8Ra9MK1KubaLEOnk4vcTVfXP2AKw=
|
||||||
|
github.com/hamba/avro/v2 v2.4.0/go.mod h1:6MapKiXjILKSuR/z7SMwkihv2f//wahd/l2bUDHHqI4=
|
||||||
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
|
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
|
||||||
github.com/hashicorp/consul/api v1.18.0 h1:R7PPNzTCeN6VuQNDwwhZWJvzCtGSrNpJqfb22h3yH9g=
|
github.com/hashicorp/consul/api v1.18.0 h1:R7PPNzTCeN6VuQNDwwhZWJvzCtGSrNpJqfb22h3yH9g=
|
||||||
github.com/hashicorp/consul/api v1.18.0/go.mod h1:owRRGJ9M5xReDC5nfT8FTJrNAPbT4NM6p/k+d03q2v4=
|
github.com/hashicorp/consul/api v1.18.0/go.mod h1:owRRGJ9M5xReDC5nfT8FTJrNAPbT4NM6p/k+d03q2v4=
|
||||||
|
@ -449,6 +452,7 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
|
||||||
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||||
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
|
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmLKZf+SjVanKKhCgf3bg+511DmU9eDQTen7LLbY=
|
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmLKZf+SjVanKKhCgf3bg+511DmU9eDQTen7LLbY=
|
||||||
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||||
|
|
|
@ -15,6 +15,7 @@ package pulsar_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -90,6 +91,24 @@ func subscriberApplication(appID string, topicName string, messagesWatcher *watc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func subscriberSchemaApplication(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
||||||
|
return func(ctx flow.Context, s common.Service) error {
|
||||||
|
// Setup the /orders event handler.
|
||||||
|
return multierr.Combine(
|
||||||
|
s.AddTopicEventHandler(&common.Subscription{
|
||||||
|
PubsubName: pubsubName,
|
||||||
|
Topic: topicName,
|
||||||
|
Route: "/orders",
|
||||||
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||||
|
// Track/Observe the data of the event.
|
||||||
|
messagesWatcher.ObserveJSON(e.Data)
|
||||||
|
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
||||||
|
return false, nil
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func publishMessages(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
func publishMessages(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||||
return func(ctx flow.Context) error {
|
return func(ctx flow.Context) error {
|
||||||
// prepare the messages
|
// prepare the messages
|
||||||
|
@ -669,6 +688,87 @@ func TestPulsarDelay(t *testing.T) {
|
||||||
Run()
|
Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type schemaTest struct {
|
||||||
|
ID int `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPulsarSchema(t *testing.T) {
|
||||||
|
consumerGroup1 := watcher.NewUnordered()
|
||||||
|
|
||||||
|
publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||||
|
return func(ctx flow.Context) error {
|
||||||
|
// prepare the messages
|
||||||
|
messages := make([]string, numMessages)
|
||||||
|
for i := range messages {
|
||||||
|
test := &schemaTest{
|
||||||
|
ID: i,
|
||||||
|
Name: uuid.New().String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
b, _ := json.Marshal(test)
|
||||||
|
messages[i] = string(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, messageWatcher := range messageWatchers {
|
||||||
|
messageWatcher.ExpectStrings(messages...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the sidecar (dapr) client
|
||||||
|
client := sidecar.GetClient(ctx, sidecarName)
|
||||||
|
|
||||||
|
// publish messages
|
||||||
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
||||||
|
|
||||||
|
for _, message := range messages {
|
||||||
|
ctx.Logf("Publishing: %q", message)
|
||||||
|
|
||||||
|
err := client.PublishEvent(ctx, pubsubName, topicName, message)
|
||||||
|
require.NoError(ctx, err, "error publishing message")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
flow.New(t, "pulsar certification schema test").
|
||||||
|
|
||||||
|
// Run subscriberApplication app1
|
||||||
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
||||||
|
subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))).
|
||||||
|
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
|
||||||
|
Step("wait", flow.Sleep(10*time.Second)).
|
||||||
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
||||||
|
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
||||||
|
Topic: "topic-1",
|
||||||
|
SubscriptionName: "my-sub",
|
||||||
|
Type: pulsar.Shared,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
||||||
|
}
|
||||||
|
defer consumer.Close()
|
||||||
|
|
||||||
|
return err
|
||||||
|
})).
|
||||||
|
Step(sidecar.Run(sidecarName1,
|
||||||
|
embedded.WithComponentsPath("./components/consumer_four"),
|
||||||
|
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||||
|
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
|
||||||
|
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
|
||||||
|
componentRuntimeOptions(),
|
||||||
|
)).
|
||||||
|
Step("publish messages to topic1", publishMessages(sidecarName1, topicActiveName, consumerGroup1)).
|
||||||
|
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
|
||||||
|
Run()
|
||||||
|
}
|
||||||
|
|
||||||
func componentRuntimeOptions() []runtime.Option {
|
func componentRuntimeOptions() []runtime.Option {
|
||||||
log := logger.NewLogger("dapr.components")
|
log := logger.NewLogger("dapr.components")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue