diff --git a/go.mod b/go.mod index 47db6e5f4..d6ed8e6be 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c github.com/hashicorp/consul/api v1.3.0 github.com/hashicorp/go-multierror v1.0.0 + github.com/hashicorp/golang-lru v0.5.4 github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a github.com/imkira/go-interpol v1.1.0 // indirect github.com/influxdata/influxdb-client-go v1.4.0 diff --git a/go.sum b/go.sum index d3f379118..20fd877f0 100644 --- a/go.sum +++ b/go.sum @@ -466,8 +466,9 @@ github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1 github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index 5e6814d9d..60827e4bd 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -9,24 +9,28 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/cenkalti/backoff/v4" + lru "github.com/hashicorp/golang-lru" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) const ( - host = "host" - enableTLS = "enableTLS" + host = "host" + enableTLS = "enableTLS" + cachedNumProducer = 10 ) type Pulsar struct { logger logger.Logger client pulsar.Client + producer pulsar.Producer metadata pulsarMetadata ctx context.Context cancel context.CancelFunc backOff backoff.BackOff + cache *lru.Cache } func NewPulsar(l logger.Logger) pubsub.PubSub { @@ -69,6 +73,20 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error { } defer client.Close() + // initialize lru cache with size 10 + // TODO: make this number configurable in pulsar metadata + c, err := lru.NewWithEvict(cachedNumProducer, func(k interface{}, v interface{}) { + producer := v.(pulsar.Producer) + if producer != nil { + producer.Close() + } + }) + if err != nil { + return fmt.Errorf("could not initialize pulsar lru cache for publisher") + } + p.cache = c + defer p.cache.Purge() + p.ctx, p.cancel = context.WithCancel(context.Background()) p.client = client @@ -82,22 +100,29 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error { } func (p *Pulsar) Publish(req *pubsub.PublishRequest) error { - producer, err := p.client.CreateProducer(pulsar.ProducerOptions{ - Topic: req.Topic, - }) - if err != nil { - return err + producer, _ := p.cache.Get(req.Topic) + if producer == nil { + p.logger.Debugf("creating producer for topic %s", req.Topic) + producer, err := p.client.CreateProducer(pulsar.ProducerOptions{ + Topic: req.Topic, + }) + if err != nil { + return err + } + + p.cache.Add(req.Topic, producer) + p.producer = producer + } else { + p.producer = producer.(pulsar.Producer) } - _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + _, err := p.producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: req.Data, }) if err != nil { return err } - defer producer.Close() - return nil } @@ -166,6 +191,13 @@ func (p *Pulsar) handleMessage(msg pulsar.ConsumerMessage, handler pubsub.Handle func (p *Pulsar) Close() error { p.cancel() + for _, k := range p.cache.Keys() { + producer, _ := p.cache.Peek(k) + if producer != nil { + p.logger.Debugf("closing producer for topic %s", k) + producer.(pulsar.Producer).Close() + } + } p.client.Close() return nil