feature/pubsub: add delay queue params for pulsar (#1112)

* feature/pubsub: add delay queue params for pulsar

* feature/pubsub: add delay queue params for pulsar

* feature/pubsub: add delay queue params for pulsar

* feature/pubsub: add delay queue params for pulsar

* feature/pubsub: add delay queue params for pulsar

Co-authored-by: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
yellow chicks 2021-09-18 02:41:47 +08:00 committed by GitHub
parent c712e81adf
commit c425f1319a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 3 deletions

View File

@ -18,6 +18,8 @@ import (
const (
host = "host"
enableTLS = "enableTLS"
deliverAt = "deliverAt"
deliverAfter = "deliverAfter"
cachedNumProducer = 10
)
@ -106,6 +108,7 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error {
func (p *Pulsar) Publish(req *pubsub.PublishRequest) error {
var (
producer pulsar.Producer
msg *pulsar.ProducerMessage
err error
)
cache, _ := p.cache.Get(req.Topic)
@ -123,16 +126,39 @@ func (p *Pulsar) Publish(req *pubsub.PublishRequest) error {
producer = cache.(pulsar.Producer)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: req.Data,
})
msg, err = parsePublishMetadata(req)
if err != nil {
return err
}
if _, err = producer.Send(context.Background(), msg); err != nil {
return err
}
return nil
}
// parsePublishMetadata parse publish metadata
func parsePublishMetadata(req *pubsub.PublishRequest) (
msg *pulsar.ProducerMessage, err error) {
msg = &pulsar.ProducerMessage{
Payload: req.Data,
}
if val, ok := req.Metadata[deliverAt]; ok {
msg.DeliverAt, err = time.Parse(time.RFC3339, val)
if err != nil {
return nil, err
}
}
if val, ok := req.Metadata[deliverAfter]; ok {
msg.DeliverAfter, err = time.ParseDuration(val)
if err != nil {
return nil, err
}
}
return
}
func (p *Pulsar) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
channel := make(chan pulsar.ConsumerMessage, 100)

View File

@ -2,6 +2,7 @@ package pulsar
import (
"testing"
"time"
"github.com/dapr/components-contrib/pubsub"
"github.com/stretchr/testify/assert"
@ -17,6 +18,21 @@ func TestParsePulsarMetadata(t *testing.T) {
assert.Equal(t, false, meta.EnableTLS)
}
func TestParsePublishMetadata(t *testing.T) {
m := &pubsub.PublishRequest{}
m.Metadata = map[string]string{
"deliverAt": "2021-08-31T11:45:02Z",
"deliverAfter": "60s",
}
msg, err := parsePublishMetadata(m)
assert.Nil(t, err)
val, _ := time.ParseDuration("60s")
assert.Equal(t, val, msg.DeliverAfter)
assert.Equal(t, "2021-08-31T11:45:02Z",
msg.DeliverAt.Format(time.RFC3339))
}
func TestMissingHost(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": ""}