Improve pulsar publisher performance. (#827)

* Improve pulsar publisher performance.

* Addressed comments and added some debug logs

* Use peek instead of get for closing producers

* Fixed lint issue

* Added debug log in close

* Change back o 100

* Close producer before client

* Fixed cached number of producer and added todo for making it configurable

Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
Xueqian Wang 2021-04-21 13:53:07 -04:00 committed by GitHub
parent 2320824409
commit f761b51c77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 11 deletions

1
go.mod
View File

@ -55,6 +55,7 @@ require (
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c
github.com/hashicorp/consul/api v1.3.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.4
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/influxdata/influxdb-client-go v1.4.0

3
go.sum
View File

@ -466,8 +466,9 @@ github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=

View File

@ -9,24 +9,28 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/cenkalti/backoff/v4"
lru "github.com/hashicorp/golang-lru"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
const (
host = "host"
enableTLS = "enableTLS"
host = "host"
enableTLS = "enableTLS"
cachedNumProducer = 10
)
type Pulsar struct {
logger logger.Logger
client pulsar.Client
producer pulsar.Producer
metadata pulsarMetadata
ctx context.Context
cancel context.CancelFunc
backOff backoff.BackOff
cache *lru.Cache
}
func NewPulsar(l logger.Logger) pubsub.PubSub {
@ -69,6 +73,20 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error {
}
defer client.Close()
// initialize lru cache with size 10
// TODO: make this number configurable in pulsar metadata
c, err := lru.NewWithEvict(cachedNumProducer, func(k interface{}, v interface{}) {
producer := v.(pulsar.Producer)
if producer != nil {
producer.Close()
}
})
if err != nil {
return fmt.Errorf("could not initialize pulsar lru cache for publisher")
}
p.cache = c
defer p.cache.Purge()
p.ctx, p.cancel = context.WithCancel(context.Background())
p.client = client
@ -82,22 +100,29 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error {
}
func (p *Pulsar) Publish(req *pubsub.PublishRequest) error {
producer, err := p.client.CreateProducer(pulsar.ProducerOptions{
Topic: req.Topic,
})
if err != nil {
return err
producer, _ := p.cache.Get(req.Topic)
if producer == nil {
p.logger.Debugf("creating producer for topic %s", req.Topic)
producer, err := p.client.CreateProducer(pulsar.ProducerOptions{
Topic: req.Topic,
})
if err != nil {
return err
}
p.cache.Add(req.Topic, producer)
p.producer = producer
} else {
p.producer = producer.(pulsar.Producer)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
_, err := p.producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: req.Data,
})
if err != nil {
return err
}
defer producer.Close()
return nil
}
@ -166,6 +191,13 @@ func (p *Pulsar) handleMessage(msg pulsar.ConsumerMessage, handler pubsub.Handle
func (p *Pulsar) Close() error {
p.cancel()
for _, k := range p.cache.Keys() {
producer, _ := p.cache.Peek(k)
if producer != nil {
p.logger.Debugf("closing producer for topic %s", k)
producer.(pulsar.Producer).Close()
}
}
p.client.Close()
return nil