391 lines
11 KiB
Go
391 lines
11 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/linkedin/goavro/v2"
|
|
"github.com/riferrei/srclient"
|
|
|
|
"github.com/dapr/components-contrib/pubsub"
|
|
"github.com/dapr/kit/logger"
|
|
kitmd "github.com/dapr/kit/metadata"
|
|
"github.com/dapr/kit/retry"
|
|
)
|
|
|
|
// Kafka allows reading/writing to a Kafka consumer group.
|
|
type Kafka struct {
|
|
producer sarama.SyncProducer
|
|
consumerGroup string
|
|
brokers []string
|
|
logger logger.Logger
|
|
authType string
|
|
saslUsername string
|
|
saslPassword string
|
|
initialOffset int64
|
|
cg sarama.ConsumerGroup
|
|
consumer consumer
|
|
config *sarama.Config
|
|
subscribeTopics TopicHandlerConfig
|
|
subscribeLock sync.Mutex
|
|
|
|
// schema registry settings
|
|
srClient srclient.ISchemaRegistryClient
|
|
schemaCachingEnabled bool
|
|
latestSchemaCache map[string]SchemaCacheEntry
|
|
latestSchemaCacheTTL time.Duration
|
|
latestSchemaCacheWriteLock sync.RWMutex
|
|
latestSchemaCacheReadLock sync.Mutex
|
|
|
|
// used for background logic that cannot use the context passed to the Init function
|
|
internalContext context.Context
|
|
internalContextCancel func()
|
|
|
|
backOffConfig retry.Config
|
|
|
|
// The default value should be true for kafka pubsub component and false for kafka binding component
|
|
// This default value can be overridden by metadata consumeRetryEnabled
|
|
DefaultConsumeRetryEnabled bool
|
|
consumeRetryEnabled bool
|
|
consumeRetryInterval time.Duration
|
|
}
|
|
|
|
type SchemaType int
|
|
|
|
const (
|
|
None SchemaType = iota
|
|
Avro
|
|
)
|
|
|
|
type SchemaCacheEntry struct {
|
|
schema *srclient.Schema
|
|
codec *goavro.Codec
|
|
expirationTime time.Time
|
|
}
|
|
|
|
func GetValueSchemaType(metadata map[string]string) (SchemaType, error) {
|
|
schemaTypeStr, ok := kitmd.GetMetadataProperty(metadata, valueSchemaType)
|
|
if ok {
|
|
v, err := parseSchemaType(schemaTypeStr)
|
|
return v, err
|
|
}
|
|
return None, nil
|
|
}
|
|
|
|
func parseSchemaType(sVal string) (SchemaType, error) {
|
|
switch strings.ToLower(sVal) {
|
|
case "avro":
|
|
return Avro, nil
|
|
case "none":
|
|
return None, nil
|
|
default:
|
|
return None, fmt.Errorf("error parsing schema type. '%s' is not a supported value", sVal)
|
|
}
|
|
}
|
|
|
|
func NewKafka(logger logger.Logger) *Kafka {
|
|
return &Kafka{
|
|
logger: logger,
|
|
subscribeTopics: make(TopicHandlerConfig),
|
|
subscribeLock: sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
// Init does metadata parsing and connection establishment.
|
|
func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
|
|
upgradedMetadata, err := k.upgradeMetadata(metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
meta, err := k.getKafkaMetadata(upgradedMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// this context can't use the context passed to Init because that context would be cancelled right after Init
|
|
k.internalContext, k.internalContextCancel = context.WithCancel(context.Background())
|
|
|
|
k.brokers = meta.internalBrokers
|
|
k.consumerGroup = meta.ConsumerGroup
|
|
k.initialOffset = meta.internalInitialOffset
|
|
k.authType = meta.AuthType
|
|
|
|
config := sarama.NewConfig()
|
|
config.Version = meta.internalVersion
|
|
config.Consumer.Offsets.Initial = k.initialOffset
|
|
config.Consumer.Fetch.Min = meta.consumerFetchMin
|
|
config.Consumer.Fetch.Default = meta.consumerFetchDefault
|
|
config.ChannelBufferSize = meta.channelBufferSize
|
|
|
|
if meta.ClientID != "" {
|
|
config.ClientID = meta.ClientID
|
|
}
|
|
|
|
err = updateTLSConfig(config, meta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch strings.ToLower(k.authType) {
|
|
case oidcAuthType:
|
|
k.logger.Info("Configuring SASL OAuth2/OIDC authentication")
|
|
err = updateOidcAuthInfo(config, meta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case passwordAuthType:
|
|
k.logger.Info("Configuring SASL Password authentication")
|
|
k.saslUsername = meta.SaslUsername
|
|
k.saslPassword = meta.SaslPassword
|
|
updatePasswordAuthInfo(config, meta, k.saslUsername, k.saslPassword)
|
|
case mtlsAuthType:
|
|
k.logger.Info("Configuring mTLS authentcation")
|
|
err = updateMTLSAuthInfo(config, meta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case certificateAuthType:
|
|
// already handled in updateTLSConfig
|
|
case awsIAMAuthType:
|
|
k.logger.Info("Configuring AWS IAM authentcation")
|
|
err = updateAWSIAMAuthInfo(k.internalContext, config, meta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
k.config = config
|
|
sarama.Logger = SaramaLogBridge{daprLogger: k.logger}
|
|
|
|
k.producer, err = getSyncProducer(*k.config, k.brokers, meta.MaxMessageBytes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Default retry configuration is used if no
|
|
// backOff properties are set.
|
|
if err := retry.DecodeConfigWithPrefix(
|
|
&k.backOffConfig,
|
|
metadata,
|
|
"backOff"); err != nil {
|
|
return err
|
|
}
|
|
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
|
|
k.consumeRetryInterval = meta.ConsumeRetryInterval
|
|
|
|
if meta.SchemaRegistryURL != "" {
|
|
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
|
|
// Empty password is a possibility
|
|
if meta.SchemaRegistryAPIKey != "" {
|
|
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
|
|
}
|
|
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
|
|
if meta.SchemaCachingEnabled {
|
|
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
|
|
k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL
|
|
}
|
|
}
|
|
k.logger.Debug("Kafka message bus initialization complete")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *Kafka) Close() (err error) {
|
|
k.closeSubscriptionResources()
|
|
|
|
if k.producer != nil {
|
|
err = k.producer.Close()
|
|
k.producer = nil
|
|
}
|
|
|
|
if k.internalContext != nil {
|
|
k.internalContextCancel()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func getSchemaSubject(topic string) string {
|
|
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
|
|
return topic + "-value"
|
|
}
|
|
|
|
func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
|
|
switch config.ValueSchemaType {
|
|
case Avro:
|
|
srClient, err := k.getSchemaRegistyClient()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(message.Value) < 5 {
|
|
return nil, fmt.Errorf("value is too short")
|
|
}
|
|
schemaID := binary.BigEndian.Uint32(message.Value[1:5])
|
|
schema, err := srClient.GetSchema(int(schemaID))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
|
|
// Use this specific codec instead.
|
|
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
native, _, err := codec.NativeFromBinary(message.Value[5:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value, err := codec.TextualFromNative(nil, native)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return value, nil
|
|
default:
|
|
return message.Value, nil
|
|
}
|
|
}
|
|
|
|
func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, error) {
|
|
srClient, err := k.getSchemaRegistyClient()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
subject := getSchemaSubject(topic)
|
|
if k.schemaCachingEnabled {
|
|
k.latestSchemaCacheReadLock.Lock()
|
|
cacheEntry, ok := k.latestSchemaCache[subject]
|
|
k.latestSchemaCacheReadLock.Unlock()
|
|
|
|
// Cache present and not expired
|
|
if ok && cacheEntry.expirationTime.After(time.Now()) {
|
|
return cacheEntry.schema, cacheEntry.codec, nil
|
|
}
|
|
schema, errSchema := srClient.GetLatestSchema(subject)
|
|
if errSchema != nil {
|
|
return nil, nil, errSchema
|
|
}
|
|
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
|
|
// Since standard json is passed from dapr, it is needed.
|
|
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
|
|
if errCodec != nil {
|
|
return nil, nil, errCodec
|
|
}
|
|
k.latestSchemaCacheWriteLock.Lock()
|
|
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
|
|
k.latestSchemaCacheWriteLock.Unlock()
|
|
return schema, codec, nil
|
|
}
|
|
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return schema, codec, nil
|
|
}
|
|
|
|
func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
|
|
if k.srClient == nil {
|
|
return nil, errors.New("schema registry details not set")
|
|
}
|
|
|
|
return k.srClient, nil
|
|
}
|
|
|
|
func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
|
|
valueSchemaType, err := GetValueSchemaType(metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch valueSchemaType {
|
|
case Avro:
|
|
schema, codec, err := k.getLatestSchema(topic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
native, _, err := codec.NativeFromTextual(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
valueBytes, err := codec.BinaryFromNative(nil, native)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
schemaIDBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))
|
|
|
|
recordValue := make([]byte, 0, len(schemaIDBytes)+len(valueBytes)+1)
|
|
recordValue = append(recordValue, byte(0))
|
|
recordValue = append(recordValue, schemaIDBytes...)
|
|
recordValue = append(recordValue, valueBytes...)
|
|
return recordValue, nil
|
|
default:
|
|
return data, nil
|
|
}
|
|
}
|
|
|
|
// EventHandler is the handler used to handle the subscribed event.
|
|
type EventHandler func(ctx context.Context, msg *NewEvent) error
|
|
|
|
// BulkEventHandler is the handler used to handle the subscribed bulk event.
|
|
type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)
|
|
|
|
// SubscriptionHandlerConfig is the handler and configuration for subscription.
|
|
type SubscriptionHandlerConfig struct {
|
|
IsBulkSubscribe bool
|
|
SubscribeConfig pubsub.BulkSubscribeConfig
|
|
BulkHandler BulkEventHandler
|
|
Handler EventHandler
|
|
ValueSchemaType SchemaType
|
|
}
|
|
|
|
// NewEvent is an event arriving from a message bus instance.
|
|
type NewEvent struct {
|
|
Data []byte `json:"data"`
|
|
Topic string `json:"topic"`
|
|
Metadata map[string]string `json:"metadata"`
|
|
ContentType *string `json:"contentType,omitempty"`
|
|
}
|
|
|
|
// KafkaBulkMessage is a bulk event arriving from a message bus instance.
|
|
type KafkaBulkMessage struct {
|
|
Entries []KafkaBulkMessageEntry `json:"entries"`
|
|
Topic string `json:"topic"`
|
|
Metadata map[string]string `json:"metadata"`
|
|
}
|
|
|
|
// KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
|
|
type KafkaBulkMessageEntry struct {
|
|
EntryId string `json:"entryId"` //nolint:stylecheck
|
|
Event []byte `json:"event"`
|
|
ContentType string `json:"contentType,omitempty"`
|
|
Metadata map[string]string `json:"metadata"`
|
|
}
|