Apache Pulsar - pubsub component (#346)
* created pulsar component * fixed multi topic issue * moved metadata struct * finished writing metadata tests * fixed linter issues * modifies consumer type to failover * uncapitalized errors * changed subscriptionName to consumerID * acknowledge message only if there's no error returned from handler Co-authored-by: john verdonck <John@johns-MacBook-Pro.local>
This commit is contained in:
parent
8eff31b191
commit
b54585c4f0
1
go.mod
1
go.mod
|
@ -21,6 +21,7 @@ require (
|
|||
github.com/a8m/documentdb v1.2.0
|
||||
github.com/aerospike/aerospike-client-go v2.7.0+incompatible
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.0.7+incompatible
|
||||
github.com/apache/pulsar-client-go v0.1.0
|
||||
github.com/aws/aws-sdk-go v1.25.0
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
|
||||
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
|
||||
|
|
14
go.sum
14
go.sum
|
@ -39,8 +39,6 @@ github.com/Azure/azure-sdk-for-go v29.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9mo
|
|||
github.com/Azure/azure-sdk-for-go v30.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v33.4.0+incompatible h1:yzJKzcKTX0WwDdZC8kAqxiGVZz66uqpajhgphstEUN0=
|
||||
github.com/Azure/azure-sdk-for-go v33.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible h1:aFlw3lP7ZHQi4m1kWCpcwYtczhDkGhDoRaMTaxcOf68=
|
||||
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v42.0.0+incompatible h1:yz6sFf5bHZ+gEOQVuK5JhPqTTAmv+OvSLSaqgzqaCwY=
|
||||
github.com/Azure/azure-sdk-for-go v42.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-service-bus-go v0.9.1 h1:G1qBLQvHCFDv9pcpgwgFkspzvnGknJRR0PYJ9ytY/JA=
|
||||
|
@ -122,6 +120,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
|
|||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.0.7+incompatible h1:HXvOJsZw8JT/ldxjX74Aq4H2IY4ojV/mXMDPWFitpv8=
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.0.7+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
|
||||
github.com/apache/pulsar-client-go v0.1.0 h1:2BFZztxtNgFyOzBc+5On84CX6aIZW5xwh7KM0MWigGI=
|
||||
github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
|
||||
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
|
||||
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||
|
@ -136,6 +136,7 @@ github.com/aws/aws-sdk-go v1.25.0 h1:MyXUdCesJLBvSSKYcaKeeEwxNUwUpG6/uqVYeH/Zzfo
|
|||
github.com/aws/aws-sdk-go v1.25.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
|
||||
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
|
@ -145,6 +146,7 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE
|
|||
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
|
||||
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
|
||||
|
@ -475,6 +477,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
|
|||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
|
||||
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/compress v1.9.2 h1:LfVyl+ZlLlLDeQ/d2AqfGIIH4qEDu0Ed2S5GyhCWIWY=
|
||||
github.com/klauspost/compress v1.9.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
|
||||
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
|
@ -667,10 +671,8 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
|
|||
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||
github.com/sendgrid/rest v1.0.2 h1:xdfALkR1m9eqf41/zEnUmV0fw4b31ZzGZ4Dj5f2/w04=
|
||||
github.com/sendgrid/rest v2.4.1+incompatible h1:HDib/5xzQREPq34lN3YMhQtMkdXxS/qLp5G3k9a5++4=
|
||||
github.com/sendgrid/rest v2.4.1+incompatible/go.mod h1:kXX7q3jZtJXK5c5qK83bSGMdV6tsOE70KbHoqJls4lE=
|
||||
github.com/sendgrid/sendgrid-go v1.2.0 h1:2K3teZdhaPe12ftFyFL4AWDH4QmNPc+sCi6mWFx5+oo=
|
||||
github.com/sendgrid/sendgrid-go v3.5.0+incompatible h1:kosbgHyNVYVaqECDYvFVLVD9nvThweBd6xp7vaCT3GI=
|
||||
github.com/sendgrid/sendgrid-go v3.5.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8=
|
||||
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
|
||||
|
@ -682,6 +684,7 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
|
|||
github.com/sirupsen/logrus v1.1.1 h1:VzGj7lhU7KEB9e9gMpAV/v5XT2NVSvLJhJLCWbnkgXg=
|
||||
github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
|
||||
|
@ -691,9 +694,12 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9
|
|||
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
|
||||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
|
||||
github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s=
|
||||
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
|
||||
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package pulsar
|
||||
|
||||
type pulsarMetadata struct {
|
||||
Host string `json:"host"`
|
||||
ConsumerID string `json:"consumerID"`
|
||||
EnableTLS bool `json:"enableTLS"`
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package pulsar
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/dapr/dapr/pkg/logger"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
const (
|
||||
host = "host"
|
||||
enableTLS = "enableTLS"
|
||||
)
|
||||
|
||||
type Pulsar struct {
|
||||
logger logger.Logger
|
||||
client pulsar.Client
|
||||
metadata pulsarMetadata
|
||||
}
|
||||
|
||||
func NewPulsar(l logger.Logger) pubsub.PubSub {
|
||||
return &Pulsar{logger: l}
|
||||
}
|
||||
|
||||
func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
||||
m := pulsarMetadata{}
|
||||
m.ConsumerID = meta.Properties["consumerID"]
|
||||
|
||||
if val, ok := meta.Properties[host]; ok && val != "" {
|
||||
m.Host = val
|
||||
} else {
|
||||
return nil, errors.New("pulsar error: missing pulsar host")
|
||||
}
|
||||
if val, ok := meta.Properties[enableTLS]; ok && val != "" {
|
||||
tls, err := strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
return nil, errors.New("pulsar error: invalid value for enableTLS")
|
||||
}
|
||||
m.EnableTLS = tls
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func (p *Pulsar) Init(metadata pubsub.Metadata) error {
|
||||
m, err := parsePulsarMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := pulsar.NewClient(pulsar.ClientOptions{
|
||||
URL: fmt.Sprintf("pulsar://%s", m.Host),
|
||||
OperationTimeout: 30 * time.Second,
|
||||
ConnectionTimeout: 30 * time.Second,
|
||||
TLSAllowInsecureConnection: m.EnableTLS,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not instantiate pulsar client: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
p.client = client
|
||||
p.metadata = *m
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pulsar) Publish(req *pubsub.PublishRequest) error {
|
||||
producer, err := p.client.CreateProducer(pulsar.ProducerOptions{
|
||||
Topic: req.Topic,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
|
||||
Payload: req.Data,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer producer.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pulsar) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
|
||||
channel := make(chan pulsar.ConsumerMessage, 100)
|
||||
|
||||
options := pulsar.ConsumerOptions{
|
||||
Topic: req.Topic,
|
||||
SubscriptionName: p.metadata.ConsumerID,
|
||||
Type: pulsar.Failover,
|
||||
}
|
||||
|
||||
options.MessageChannel = channel
|
||||
consumer, err := p.client.Subscribe(options)
|
||||
if err != nil {
|
||||
p.logger.Debugf("Could not subscribe %s", req.Topic)
|
||||
}
|
||||
|
||||
go p.ListenMessage(consumer, req.Topic, handler)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pulsar) ListenMessage(msgs pulsar.Consumer, topic string, handler func(msg *pubsub.NewMessage) error) {
|
||||
for cm := range msgs.Chan() {
|
||||
p.HandleMessage(cm, topic, handler)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pulsar) HandleMessage(m pulsar.ConsumerMessage, topic string, handler func(msg *pubsub.NewMessage) error) {
|
||||
err := handler(&pubsub.NewMessage{
|
||||
Data: m.Payload(),
|
||||
Topic: topic,
|
||||
})
|
||||
if err != nil {
|
||||
p.logger.Debugf("Could not handle topic %s", topic)
|
||||
} else {
|
||||
m.Ack(m.Message)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package pulsar
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
func TestParsePulsarMetadata(t *testing.T) {
|
||||
m := pubsub.Metadata{}
|
||||
m.Properties = map[string]string{"host": "a", "enableTLS": "false"}
|
||||
meta, err := parsePulsarMetadata(m)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "a", meta.Host)
|
||||
assert.Equal(t, false, meta.EnableTLS)
|
||||
}
|
||||
|
||||
func TestMissingHost(t *testing.T) {
|
||||
m := pubsub.Metadata{}
|
||||
m.Properties = map[string]string{"host": ""}
|
||||
meta, err := parsePulsarMetadata(m)
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, meta)
|
||||
assert.Equal(t, "pulsar error: missing pulsar host", err.Error())
|
||||
}
|
||||
|
||||
func TestInvalidTLSInput(t *testing.T) {
|
||||
m := pubsub.Metadata{}
|
||||
m.Properties = map[string]string{"host": "a", "enableTLS": "honk"}
|
||||
meta, err := parsePulsarMetadata(m)
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, meta)
|
||||
assert.Equal(t, "pulsar error: invalid value for enableTLS", err.Error())
|
||||
}
|
Loading…
Reference in New Issue