Fix the Route of subscription API does not work on pubsub.pulsar (#1283)
* feat(pulsar): add tenant and namesapce. fix topic parsing. * add test * fix review add topic format unit test * fix review: add persistent to metadata, fix nits * fix lint Co-authored-by: Long Dai <long.dai@intel.com> Co-authored-by: Mukundan Sundararajan <musundar@microsoft.com> Co-authored-by: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com> Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
This commit is contained in:
parent
d5a68041c9
commit
4bf0dcbfcf
|
|
@ -4,4 +4,7 @@ type pulsarMetadata struct {
|
|||
Host string `json:"host"`
|
||||
ConsumerID string `json:"consumerID"`
|
||||
EnableTLS bool `json:"enableTLS"`
|
||||
Tenant string `json:"tenant"`
|
||||
Namespace string `json:"namespace"`
|
||||
Persistent bool `json:"persistent"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,11 +16,22 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
host = "host"
|
||||
enableTLS = "enableTLS"
|
||||
deliverAt = "deliverAt"
|
||||
deliverAfter = "deliverAfter"
|
||||
host = "host"
|
||||
enableTLS = "enableTLS"
|
||||
deliverAt = "deliverAt"
|
||||
deliverAfter = "deliverAfter"
|
||||
tenant = "tenant"
|
||||
namespace = "namespace"
|
||||
persistent = "persistent"
|
||||
|
||||
defaultTenant = "public"
|
||||
defaultNamespace = "default"
|
||||
cachedNumProducer = 10
|
||||
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
|
||||
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
|
||||
topicFormat = "%s://%s/%s/%s"
|
||||
persistentStr = "persistent"
|
||||
nonPersistentStr = "non-persistent"
|
||||
)
|
||||
|
||||
type Pulsar struct {
|
||||
|
|
@ -39,7 +50,7 @@ func NewPulsar(l logger.Logger) pubsub.PubSub {
|
|||
}
|
||||
|
||||
func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
||||
m := pulsarMetadata{}
|
||||
m := pulsarMetadata{Persistent: true, Tenant: defaultTenant, Namespace: defaultNamespace}
|
||||
m.ConsumerID = meta.Properties["consumerID"]
|
||||
|
||||
if val, ok := meta.Properties[host]; ok && val != "" {
|
||||
|
|
@ -55,6 +66,20 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
|||
m.EnableTLS = tls
|
||||
}
|
||||
|
||||
if val, ok := meta.Properties[persistent]; ok && val != "" {
|
||||
per, err := strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
return nil, errors.New("pulsar error: invalid value for persistent")
|
||||
}
|
||||
m.Persistent = per
|
||||
}
|
||||
if val, ok := meta.Properties[tenant]; ok && val != "" {
|
||||
m.Tenant = val
|
||||
}
|
||||
if val, ok := meta.Properties[namespace]; ok && val != "" {
|
||||
m.Namespace = val
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
|
|
@ -111,17 +136,18 @@ func (p *Pulsar) Publish(req *pubsub.PublishRequest) error {
|
|||
msg *pulsar.ProducerMessage
|
||||
err error
|
||||
)
|
||||
cache, _ := p.cache.Get(req.Topic)
|
||||
topic := p.formatTopic(req.Topic)
|
||||
cache, _ := p.cache.Get(topic)
|
||||
if cache == nil {
|
||||
p.logger.Debugf("creating producer for topic %s", req.Topic)
|
||||
p.logger.Debugf("creating producer for topic %s, full topic name in pulsar is %s", req.Topic, topic)
|
||||
producer, err = p.client.CreateProducer(pulsar.ProducerOptions{
|
||||
Topic: req.Topic,
|
||||
Topic: topic,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.cache.Add(req.Topic, producer)
|
||||
p.cache.Add(topic, producer)
|
||||
} else {
|
||||
producer = cache.(pulsar.Producer)
|
||||
}
|
||||
|
|
@ -162,8 +188,9 @@ func parsePublishMetadata(req *pubsub.PublishRequest) (
|
|||
func (p *Pulsar) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
channel := make(chan pulsar.ConsumerMessage, 100)
|
||||
|
||||
topic := p.formatTopic(req.Topic)
|
||||
options := pulsar.ConsumerOptions{
|
||||
Topic: req.Topic,
|
||||
Topic: topic,
|
||||
SubscriptionName: p.metadata.ConsumerID,
|
||||
Type: pulsar.Failover,
|
||||
MessageChannel: channel,
|
||||
|
|
@ -171,23 +198,23 @@ func (p *Pulsar) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler)
|
|||
|
||||
consumer, err := p.client.Subscribe(options)
|
||||
if err != nil {
|
||||
p.logger.Debugf("Could not subscribe %s", req.Topic)
|
||||
p.logger.Debugf("Could not subscribe to %s, full topic name in pulsar is %s", req.Topic, topic)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
go p.listenMessage(consumer, handler)
|
||||
go p.listenMessage(req.Topic, consumer, handler)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pulsar) listenMessage(consumer pulsar.Consumer, handler pubsub.Handler) {
|
||||
func (p *Pulsar) listenMessage(originTopic string, consumer pulsar.Consumer, handler pubsub.Handler) {
|
||||
defer consumer.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-consumer.Chan():
|
||||
if err := p.handleMessage(msg, handler); err != nil && !errors.Is(err, context.Canceled) {
|
||||
if err := p.handleMessage(originTopic, msg, handler); err != nil && !errors.Is(err, context.Canceled) {
|
||||
p.logger.Errorf("Error processing message and retries are exhausted: %s/%#v [key=%s]. Closing consumer.", msg.Topic(), msg.ID(), msg.Key())
|
||||
|
||||
return
|
||||
|
|
@ -200,10 +227,10 @@ func (p *Pulsar) listenMessage(consumer pulsar.Consumer, handler pubsub.Handler)
|
|||
}
|
||||
}
|
||||
|
||||
func (p *Pulsar) handleMessage(msg pulsar.ConsumerMessage, handler pubsub.Handler) error {
|
||||
func (p *Pulsar) handleMessage(originTopic string, msg pulsar.ConsumerMessage, handler pubsub.Handler) error {
|
||||
pubsubMsg := pubsub.NewMessage{
|
||||
Data: msg.Payload(),
|
||||
Topic: msg.Topic(),
|
||||
Topic: originTopic,
|
||||
Metadata: msg.Properties(),
|
||||
}
|
||||
|
||||
|
|
@ -241,3 +268,12 @@ func (p *Pulsar) Close() error {
|
|||
func (p *Pulsar) Features() []pubsub.Feature {
|
||||
return nil
|
||||
}
|
||||
|
||||
// formatTopic formats the topic into pulsar's structure with tenant and namespace.
|
||||
func (p *Pulsar) formatTopic(topic string) string {
|
||||
persist := persistentStr
|
||||
if !p.metadata.Persistent {
|
||||
persist = nonPersistentStr
|
||||
}
|
||||
return fmt.Sprintf(topicFormat, persist, p.metadata.Tenant, p.metadata.Namespace, topic)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ func TestParsePulsarMetadata(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, "a", meta.Host)
|
||||
assert.Equal(t, false, meta.EnableTLS)
|
||||
assert.Equal(t, defaultTenant, meta.Tenant)
|
||||
assert.Equal(t, defaultNamespace, meta.Namespace)
|
||||
}
|
||||
|
||||
func TestParsePublishMetadata(t *testing.T) {
|
||||
|
|
@ -53,3 +55,44 @@ func TestInvalidTLSInput(t *testing.T) {
|
|||
assert.Nil(t, meta)
|
||||
assert.Equal(t, "pulsar error: invalid value for enableTLS", err.Error())
|
||||
}
|
||||
|
||||
func TestValidTenantAndNS(t *testing.T) {
|
||||
var (
|
||||
testTenant = "testTenant"
|
||||
testNamespace = "testNamespace"
|
||||
testTopic = "testTopic"
|
||||
expectPersistentResult = "persistent://testTenant/testNamespace/testTopic"
|
||||
expectNonPersistentResult = "non-persistent://testTenant/testNamespace/testTopic"
|
||||
)
|
||||
m := pubsub.Metadata{}
|
||||
m.Properties = map[string]string{"host": "a", tenant: testTenant, namespace: testNamespace}
|
||||
|
||||
t.Run("test vaild tenant and namespace", func(t *testing.T) {
|
||||
meta, err := parsePulsarMetadata(m)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, testTenant, meta.Tenant)
|
||||
assert.Equal(t, testNamespace, meta.Namespace)
|
||||
})
|
||||
|
||||
t.Run("test persistent format topic", func(t *testing.T) {
|
||||
meta, err := parsePulsarMetadata(m)
|
||||
p := Pulsar{metadata: *meta}
|
||||
res := p.formatTopic(testTopic)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, true, meta.Persistent)
|
||||
assert.Equal(t, expectPersistentResult, res)
|
||||
})
|
||||
|
||||
t.Run("test non-persistent format topic", func(t *testing.T) {
|
||||
m.Properties[persistent] = "false"
|
||||
meta, err := parsePulsarMetadata(m)
|
||||
p := Pulsar{metadata: *meta}
|
||||
res := p.formatTopic(testTopic)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, false, meta.Persistent)
|
||||
assert.Equal(t, expectNonPersistentResult, res)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue