544 lines
15 KiB
Go
544 lines
15 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package pulsar
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"encoding/pem"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/apache/pulsar-client-go/pulsar"
|
|
"github.com/apache/pulsar-client-go/pulsar/crypto"
|
|
"github.com/hamba/avro/v2"
|
|
lru "github.com/hashicorp/golang-lru/v2"
|
|
|
|
"github.com/dapr/components-contrib/common/authentication/oauth2"
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/pubsub"
|
|
"github.com/dapr/kit/logger"
|
|
kitmd "github.com/dapr/kit/metadata"
|
|
)
|
|
|
|
const (
|
|
host = "host"
|
|
consumerID = "consumerID"
|
|
enableTLS = "enableTLS"
|
|
deliverAt = "deliverAt"
|
|
deliverAfter = "deliverAfter"
|
|
disableBatching = "disableBatching"
|
|
batchingMaxPublishDelay = "batchingMaxPublishDelay"
|
|
batchingMaxSize = "batchingMaxSize"
|
|
batchingMaxMessages = "batchingMaxMessages"
|
|
tenant = "tenant"
|
|
namespace = "namespace"
|
|
persistent = "persistent"
|
|
redeliveryDelay = "redeliveryDelay"
|
|
avroProtocol = "avro"
|
|
jsonProtocol = "json"
|
|
protoProtocol = "proto"
|
|
partitionKey = "partitionKey"
|
|
|
|
defaultTenant = "public"
|
|
defaultNamespace = "default"
|
|
cachedNumProducer = 10
|
|
pulsarPrefix = "pulsar://"
|
|
pulsarToken = "token"
|
|
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
|
|
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
|
|
topicFormat = "%s://%s/%s/%s"
|
|
persistentStr = "persistent"
|
|
nonPersistentStr = "non-persistent"
|
|
topicJSONSchemaIdentifier = ".jsonschema"
|
|
topicAvroSchemaIdentifier = ".avroschema"
|
|
topicProtoSchemaIdentifier = ".protoschema"
|
|
|
|
// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages.
|
|
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
|
|
// defaultMaxMessages init default num of entries in per batch.
|
|
defaultMaxMessages = 1000
|
|
// defaultMaxBatchSize init default for maximum number of bytes per batch.
|
|
defaultMaxBatchSize = 128 * 1024
|
|
// defaultRedeliveryDelay init default for redelivery delay.
|
|
defaultRedeliveryDelay = 30 * time.Second
|
|
// defaultConcurrency controls the number of concurrent messages sent to the app.
|
|
defaultConcurrency = 100
|
|
|
|
subscribeTypeKey = "subscribeType"
|
|
|
|
subscribeTypeExclusive = "exclusive"
|
|
subscribeTypeShared = "shared"
|
|
subscribeTypeFailover = "failover"
|
|
subscribeTypeKeyShared = "key_shared"
|
|
|
|
processModeKey = "processMode"
|
|
|
|
processModeAsync = "async"
|
|
processModeSync = "sync"
|
|
)
|
|
|
|
type ProcessMode string
|
|
|
|
type Pulsar struct {
|
|
logger logger.Logger
|
|
client pulsar.Client
|
|
metadata pulsarMetadata
|
|
cache *lru.Cache[string, pulsar.Producer]
|
|
closed atomic.Bool
|
|
closeCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewPulsar(l logger.Logger) pubsub.PubSub {
|
|
return &Pulsar{
|
|
logger: l,
|
|
closeCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
|
|
m := pulsarMetadata{
|
|
Persistent: true,
|
|
Tenant: defaultTenant,
|
|
Namespace: defaultNamespace,
|
|
internalTopicSchemas: map[string]schemaMetadata{},
|
|
DisableBatching: false,
|
|
BatchingMaxPublishDelay: defaultBatchingMaxPublishDelay,
|
|
BatchingMaxMessages: defaultMaxMessages,
|
|
BatchingMaxSize: defaultMaxBatchSize,
|
|
RedeliveryDelay: defaultRedeliveryDelay,
|
|
MaxConcurrentHandlers: defaultConcurrency,
|
|
}
|
|
|
|
if err := kitmd.DecodeMetadata(meta.Properties, &m); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m.Host == "" {
|
|
return nil, errors.New("pulsar error: missing pulsar host")
|
|
}
|
|
|
|
for k, v := range meta.Properties {
|
|
switch {
|
|
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
|
|
topic := k[:len(k)-len(topicJSONSchemaIdentifier)]
|
|
m.internalTopicSchemas[topic] = schemaMetadata{
|
|
protocol: jsonProtocol,
|
|
value: v,
|
|
}
|
|
case strings.HasSuffix(k, topicAvroSchemaIdentifier):
|
|
topic := k[:len(k)-len(topicAvroSchemaIdentifier)]
|
|
m.internalTopicSchemas[topic] = schemaMetadata{
|
|
protocol: avroProtocol,
|
|
value: v,
|
|
}
|
|
case strings.HasSuffix(k, topicProtoSchemaIdentifier):
|
|
topic := k[:len(k)-len(topicProtoSchemaIdentifier)]
|
|
m.internalTopicSchemas[topic] = schemaMetadata{
|
|
protocol: protoProtocol,
|
|
value: v,
|
|
}
|
|
}
|
|
}
|
|
|
|
return &m, nil
|
|
}
|
|
|
|
func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
|
|
m, err := parsePulsarMetadata(metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pulsarURL := m.Host
|
|
if !strings.HasPrefix(m.Host, "http://") &&
|
|
!strings.HasPrefix(m.Host, "https://") {
|
|
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, m.Host)
|
|
}
|
|
options := pulsar.ClientOptions{
|
|
URL: pulsarURL,
|
|
OperationTimeout: 30 * time.Second,
|
|
ConnectionTimeout: 30 * time.Second,
|
|
TLSAllowInsecureConnection: !m.EnableTLS,
|
|
}
|
|
|
|
switch {
|
|
case len(m.Token) > 0:
|
|
options.Authentication = pulsar.NewAuthenticationToken(m.Token)
|
|
case len(m.ClientCredentialsMetadata.TokenURL) > 0:
|
|
var cc *oauth2.ClientCredentials
|
|
cc, err = oauth2.NewClientCredentials(ctx, oauth2.ClientCredentialsOptions{
|
|
Logger: p.logger,
|
|
TokenURL: m.ClientCredentialsMetadata.TokenURL,
|
|
CAPEM: []byte(m.ClientCredentialsMetadata.TokenCAPEM),
|
|
ClientID: m.ClientCredentialsMetadata.ClientID,
|
|
ClientSecret: m.ClientCredentialsMetadata.ClientSecret,
|
|
Scopes: m.ClientCredentialsMetadata.Scopes,
|
|
Audiences: m.ClientCredentialsMetadata.Audiences,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not instantiate oauth2 token provider: %w", err)
|
|
}
|
|
|
|
options.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cc.Token)
|
|
}
|
|
|
|
client, err := pulsar.NewClient(options)
|
|
if err != nil {
|
|
return fmt.Errorf("could not instantiate pulsar client: %v", err)
|
|
}
|
|
|
|
// initialize lru cache with size 10
|
|
// TODO: make this number configurable in pulsar metadata
|
|
c, err := lru.NewWithEvict(cachedNumProducer, func(k string, v pulsar.Producer) {
|
|
if v != nil {
|
|
v.Close()
|
|
}
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not initialize pulsar lru cache for publisher")
|
|
}
|
|
p.cache = c
|
|
defer p.cache.Purge()
|
|
|
|
p.client = client
|
|
p.metadata = *m
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Pulsar) useProducerEncryption() bool {
|
|
return p.metadata.PublicKey != "" && p.metadata.Keys != ""
|
|
}
|
|
|
|
func (p *Pulsar) useConsumerEncryption() bool {
|
|
return p.metadata.PublicKey != "" && p.metadata.PrivateKey != ""
|
|
}
|
|
|
|
func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
|
|
if p.closed.Load() {
|
|
return errors.New("component is closed")
|
|
}
|
|
|
|
var (
|
|
msg *pulsar.ProducerMessage
|
|
err error
|
|
)
|
|
topic := p.formatTopic(req.Topic)
|
|
producer, ok := p.cache.Get(topic)
|
|
|
|
sm, hasSchema := p.metadata.internalTopicSchemas[req.Topic]
|
|
|
|
if !ok || producer == nil {
|
|
p.logger.Debugf("creating producer for topic %s, full topic name in pulsar is %s", req.Topic, topic)
|
|
opts := pulsar.ProducerOptions{
|
|
Topic: topic,
|
|
DisableBatching: p.metadata.DisableBatching,
|
|
BatchingMaxPublishDelay: p.metadata.BatchingMaxPublishDelay,
|
|
BatchingMaxMessages: p.metadata.BatchingMaxMessages,
|
|
BatchingMaxSize: p.metadata.BatchingMaxSize,
|
|
}
|
|
|
|
if hasSchema {
|
|
opts.Schema = getPulsarSchema(sm)
|
|
}
|
|
|
|
if p.useProducerEncryption() {
|
|
var reader crypto.KeyReader
|
|
if isValidPEM(p.metadata.PublicKey) {
|
|
reader = NewDataKeyReader(p.metadata.PublicKey, "")
|
|
} else {
|
|
reader = crypto.NewFileKeyReader(p.metadata.PublicKey, "")
|
|
}
|
|
|
|
opts.Encryption = &pulsar.ProducerEncryptionInfo{
|
|
KeyReader: reader,
|
|
Keys: strings.Split(p.metadata.Keys, ","),
|
|
}
|
|
}
|
|
|
|
producer, err = p.client.CreateProducer(opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
p.cache.Add(topic, producer)
|
|
}
|
|
|
|
msg, err = parsePublishMetadata(req, sm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err = producer.Send(ctx, msg); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getPulsarSchema(metadata schemaMetadata) pulsar.Schema {
|
|
switch metadata.protocol {
|
|
case jsonProtocol:
|
|
return pulsar.NewJSONSchema(metadata.value, nil)
|
|
case avroProtocol:
|
|
return pulsar.NewAvroSchema(metadata.value, nil)
|
|
case protoProtocol:
|
|
return pulsar.NewProtoSchema(metadata.value, nil)
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// parsePublishMetadata parse publish metadata.
|
|
func parsePublishMetadata(req *pubsub.PublishRequest, schema schemaMetadata) (
|
|
msg *pulsar.ProducerMessage, err error,
|
|
) {
|
|
msg = &pulsar.ProducerMessage{}
|
|
|
|
switch schema.protocol {
|
|
case "":
|
|
msg.Payload = req.Data
|
|
case jsonProtocol:
|
|
var obj interface{}
|
|
err = json.Unmarshal(req.Data, &obj)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
msg.Value = obj
|
|
case avroProtocol:
|
|
var obj interface{}
|
|
avroSchema, parseErr := avro.Parse(schema.value)
|
|
if parseErr != nil {
|
|
return nil, parseErr
|
|
}
|
|
|
|
err = avro.Unmarshal(avroSchema, req.Data, &obj)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
msg.Value = obj
|
|
}
|
|
|
|
for name, value := range req.Metadata {
|
|
if value == "" {
|
|
continue
|
|
}
|
|
|
|
switch name {
|
|
case partitionKey:
|
|
msg.Key = value
|
|
case deliverAt:
|
|
msg.DeliverAt, err = time.Parse(time.RFC3339, value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
case deliverAfter:
|
|
msg.DeliverAfter, err = time.ParseDuration(value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
default:
|
|
if msg.Properties == nil {
|
|
msg.Properties = make(map[string]string)
|
|
}
|
|
msg.Properties[name] = value
|
|
}
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
// default: shared
|
|
func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType {
|
|
var subsType pulsar.SubscriptionType
|
|
|
|
subsTypeStr := strings.ToLower(metadata[subscribeTypeKey])
|
|
switch subsTypeStr {
|
|
case subscribeTypeExclusive:
|
|
subsType = pulsar.Exclusive
|
|
case subscribeTypeFailover:
|
|
subsType = pulsar.Failover
|
|
case subscribeTypeShared:
|
|
subsType = pulsar.Shared
|
|
case subscribeTypeKeyShared:
|
|
subsType = pulsar.KeyShared
|
|
default:
|
|
subsType = pulsar.Shared
|
|
}
|
|
|
|
return subsType
|
|
}
|
|
|
|
func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
|
if p.closed.Load() {
|
|
return errors.New("component is closed")
|
|
}
|
|
|
|
channel := make(chan pulsar.ConsumerMessage, p.metadata.MaxConcurrentHandlers)
|
|
|
|
topic := p.formatTopic(req.Topic)
|
|
|
|
options := pulsar.ConsumerOptions{
|
|
Topic: topic,
|
|
SubscriptionName: p.metadata.ConsumerID,
|
|
Type: getSubscribeType(req.Metadata),
|
|
MessageChannel: channel,
|
|
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
|
|
}
|
|
|
|
if p.useConsumerEncryption() {
|
|
var reader crypto.KeyReader
|
|
if isValidPEM(p.metadata.PublicKey) {
|
|
reader = NewDataKeyReader(p.metadata.PublicKey, p.metadata.PrivateKey)
|
|
} else {
|
|
reader = crypto.NewFileKeyReader(p.metadata.PublicKey, p.metadata.PrivateKey)
|
|
}
|
|
|
|
options.Decryption = &pulsar.MessageDecryptionInfo{
|
|
KeyReader: reader,
|
|
}
|
|
}
|
|
|
|
if sm, ok := p.metadata.internalTopicSchemas[req.Topic]; ok {
|
|
options.Schema = getPulsarSchema(sm)
|
|
}
|
|
consumer, err := p.client.Subscribe(options)
|
|
if err != nil {
|
|
p.logger.Debugf("Could not subscribe to %s, full topic name in pulsar is %s", req.Topic, topic)
|
|
return err
|
|
}
|
|
|
|
p.wg.Add(2)
|
|
listenCtx, cancel := context.WithCancel(ctx)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
defer cancel()
|
|
select {
|
|
case <-listenCtx.Done():
|
|
case <-p.closeCh:
|
|
}
|
|
}()
|
|
go func() {
|
|
defer p.wg.Done()
|
|
defer cancel()
|
|
p.listenMessage(listenCtx, req, consumer, handler)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Pulsar) listenMessage(ctx context.Context, req pubsub.SubscribeRequest, consumer pulsar.Consumer, handler pubsub.Handler) {
|
|
defer consumer.Close()
|
|
|
|
originTopic := req.Topic
|
|
var err error
|
|
for {
|
|
select {
|
|
case msg := <-consumer.Chan():
|
|
if strings.ToLower(req.Metadata[processModeKey]) == processModeSync { //nolint:gocritic
|
|
err = p.handleMessage(ctx, originTopic, msg, handler)
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
p.logger.Errorf("Error sync processing message: %s/%#v [key=%s]: %v", msg.Topic(), msg.ID(), msg.Key(), err)
|
|
}
|
|
} else { // async process mode by default
|
|
// Go routine to handle multiple messages at once.
|
|
p.wg.Add(1)
|
|
go func(msg pulsar.ConsumerMessage) {
|
|
defer p.wg.Done()
|
|
err = p.handleMessage(ctx, originTopic, msg, handler)
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
p.logger.Errorf("Error async processing message: %s/%#v [key=%s]: %v", msg.Topic(), msg.ID(), msg.Key(), err)
|
|
}
|
|
}(msg)
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
p.logger.Errorf("Subscription context done. Closing consumer. Err: %s", ctx.Err())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pulsar) handleMessage(ctx context.Context, originTopic string, msg pulsar.ConsumerMessage, handler pubsub.Handler) error {
|
|
pubsubMsg := pubsub.NewMessage{
|
|
Data: msg.Payload(),
|
|
Topic: originTopic,
|
|
Metadata: msg.Properties(),
|
|
}
|
|
|
|
p.logger.Debugf("Processing Pulsar message %s/%#v", msg.Topic(), msg.ID())
|
|
err := handler(ctx, &pubsubMsg)
|
|
if err != nil {
|
|
msg.Nack(msg.Message)
|
|
return err
|
|
}
|
|
|
|
msg.Ack(msg.Message)
|
|
return nil
|
|
}
|
|
|
|
func (p *Pulsar) Close() error {
|
|
defer p.wg.Wait()
|
|
if p.closed.CompareAndSwap(false, true) {
|
|
close(p.closeCh)
|
|
}
|
|
|
|
for _, k := range p.cache.Keys() {
|
|
producer, _ := p.cache.Peek(k)
|
|
if producer != nil {
|
|
p.logger.Debugf("closing producer for topic %s", k)
|
|
producer.Close()
|
|
}
|
|
}
|
|
p.client.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Pulsar) Features() []pubsub.Feature {
|
|
return nil
|
|
}
|
|
|
|
// formatTopic formats the topic into pulsar's structure with tenant and namespace.
|
|
func (p *Pulsar) formatTopic(topic string) string {
|
|
persist := persistentStr
|
|
if !p.metadata.Persistent {
|
|
persist = nonPersistentStr
|
|
}
|
|
return fmt.Sprintf(topicFormat, persist, p.metadata.Tenant, p.metadata.Namespace, topic)
|
|
}
|
|
|
|
// GetComponentMetadata returns the metadata of the component.
|
|
func (p *Pulsar) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
|
metadataStruct := pulsarMetadata{}
|
|
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType)
|
|
return
|
|
}
|
|
|
|
// isValidPEM validates the provided input has PEM formatted block.
|
|
func isValidPEM(val string) bool {
|
|
block, _ := pem.Decode([]byte(val))
|
|
return block != nil
|
|
}
|