components-contrib/common/component/kafka/kafka.go

391 lines
11 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kafka
import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/IBM/sarama"
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
kitmd "github.com/dapr/kit/metadata"
"github.com/dapr/kit/retry"
)
// Kafka allows reading/writing to a Kafka consumer group.
type Kafka struct {
producer sarama.SyncProducer
consumerGroup string
brokers []string
logger logger.Logger
authType string
saslUsername string
saslPassword string
initialOffset int64
cg sarama.ConsumerGroup
consumer consumer
config *sarama.Config
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex
// schema registry settings
srClient srclient.ISchemaRegistryClient
schemaCachingEnabled bool
latestSchemaCache map[string]SchemaCacheEntry
latestSchemaCacheTTL time.Duration
latestSchemaCacheWriteLock sync.RWMutex
latestSchemaCacheReadLock sync.Mutex
// used for background logic that cannot use the context passed to the Init function
internalContext context.Context
internalContextCancel func()
backOffConfig retry.Config
// The default value should be true for kafka pubsub component and false for kafka binding component
// This default value can be overridden by metadata consumeRetryEnabled
DefaultConsumeRetryEnabled bool
consumeRetryEnabled bool
consumeRetryInterval time.Duration
}
type SchemaType int
const (
None SchemaType = iota
Avro
)
type SchemaCacheEntry struct {
schema *srclient.Schema
codec *goavro.Codec
expirationTime time.Time
}
func GetValueSchemaType(metadata map[string]string) (SchemaType, error) {
schemaTypeStr, ok := kitmd.GetMetadataProperty(metadata, valueSchemaType)
if ok {
v, err := parseSchemaType(schemaTypeStr)
return v, err
}
return None, nil
}
func parseSchemaType(sVal string) (SchemaType, error) {
switch strings.ToLower(sVal) {
case "avro":
return Avro, nil
case "none":
return None, nil
default:
return None, fmt.Errorf("error parsing schema type. '%s' is not a supported value", sVal)
}
}
func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
logger: logger,
subscribeTopics: make(TopicHandlerConfig),
subscribeLock: sync.Mutex{},
}
}
// Init does metadata parsing and connection establishment.
func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
upgradedMetadata, err := k.upgradeMetadata(metadata)
if err != nil {
return err
}
meta, err := k.getKafkaMetadata(upgradedMetadata)
if err != nil {
return err
}
// this context can't use the context passed to Init because that context would be cancelled right after Init
k.internalContext, k.internalContextCancel = context.WithCancel(context.Background())
k.brokers = meta.internalBrokers
k.consumerGroup = meta.ConsumerGroup
k.initialOffset = meta.internalInitialOffset
k.authType = meta.AuthType
config := sarama.NewConfig()
config.Version = meta.internalVersion
config.Consumer.Offsets.Initial = k.initialOffset
config.Consumer.Fetch.Min = meta.consumerFetchMin
config.Consumer.Fetch.Default = meta.consumerFetchDefault
config.ChannelBufferSize = meta.channelBufferSize
if meta.ClientID != "" {
config.ClientID = meta.ClientID
}
err = updateTLSConfig(config, meta)
if err != nil {
return err
}
switch strings.ToLower(k.authType) {
case oidcAuthType:
k.logger.Info("Configuring SASL OAuth2/OIDC authentication")
err = updateOidcAuthInfo(config, meta)
if err != nil {
return err
}
case passwordAuthType:
k.logger.Info("Configuring SASL Password authentication")
k.saslUsername = meta.SaslUsername
k.saslPassword = meta.SaslPassword
updatePasswordAuthInfo(config, meta, k.saslUsername, k.saslPassword)
case mtlsAuthType:
k.logger.Info("Configuring mTLS authentcation")
err = updateMTLSAuthInfo(config, meta)
if err != nil {
return err
}
case certificateAuthType:
// already handled in updateTLSConfig
case awsIAMAuthType:
k.logger.Info("Configuring AWS IAM authentcation")
err = updateAWSIAMAuthInfo(k.internalContext, config, meta)
if err != nil {
return err
}
}
k.config = config
sarama.Logger = SaramaLogBridge{daprLogger: k.logger}
k.producer, err = getSyncProducer(*k.config, k.brokers, meta.MaxMessageBytes)
if err != nil {
return err
}
// Default retry configuration is used if no
// backOff properties are set.
if err := retry.DecodeConfigWithPrefix(
&k.backOffConfig,
metadata,
"backOff"); err != nil {
return err
}
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
k.consumeRetryInterval = meta.ConsumeRetryInterval
if meta.SchemaRegistryURL != "" {
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
// Empty password is a possibility
if meta.SchemaRegistryAPIKey != "" {
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
}
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
if meta.SchemaCachingEnabled {
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL
}
}
k.logger.Debug("Kafka message bus initialization complete")
return nil
}
func (k *Kafka) Close() (err error) {
k.closeSubscriptionResources()
if k.producer != nil {
err = k.producer.Close()
k.producer = nil
}
if k.internalContext != nil {
k.internalContextCancel()
}
return err
}
func getSchemaSubject(topic string) string {
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
return topic + "-value"
}
func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
switch config.ValueSchemaType {
case Avro:
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, err
}
if len(message.Value) < 5 {
return nil, fmt.Errorf("value is too short")
}
schemaID := binary.BigEndian.Uint32(message.Value[1:5])
schema, err := srClient.GetSchema(int(schemaID))
if err != nil {
return nil, err
}
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
// Use this specific codec instead.
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, err
}
native, _, err := codec.NativeFromBinary(message.Value[5:])
if err != nil {
return nil, err
}
value, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, err
}
return value, nil
default:
return message.Value, nil
}
}
func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, error) {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, nil, err
}
subject := getSchemaSubject(topic)
if k.schemaCachingEnabled {
k.latestSchemaCacheReadLock.Lock()
cacheEntry, ok := k.latestSchemaCache[subject]
k.latestSchemaCacheReadLock.Unlock()
// Cache present and not expired
if ok && cacheEntry.expirationTime.After(time.Now()) {
return cacheEntry.schema, cacheEntry.codec, nil
}
schema, errSchema := srClient.GetLatestSchema(subject)
if errSchema != nil {
return nil, nil, errSchema
}
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
// Since standard json is passed from dapr, it is needed.
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
if errCodec != nil {
return nil, nil, errCodec
}
k.latestSchemaCacheWriteLock.Lock()
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
k.latestSchemaCacheWriteLock.Unlock()
return schema, codec, nil
}
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
if err != nil {
return nil, nil, err
}
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, nil, err
}
return schema, codec, nil
}
func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
if k.srClient == nil {
return nil, errors.New("schema registry details not set")
}
return k.srClient, nil
}
func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
valueSchemaType, err := GetValueSchemaType(metadata)
if err != nil {
return nil, err
}
switch valueSchemaType {
case Avro:
schema, codec, err := k.getLatestSchema(topic)
if err != nil {
return nil, err
}
native, _, err := codec.NativeFromTextual(data)
if err != nil {
return nil, err
}
valueBytes, err := codec.BinaryFromNative(nil, native)
if err != nil {
return nil, err
}
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))
recordValue := make([]byte, 0, len(schemaIDBytes)+len(valueBytes)+1)
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, valueBytes...)
return recordValue, nil
default:
return data, nil
}
}
// EventHandler is the handler used to handle the subscribed event.
type EventHandler func(ctx context.Context, msg *NewEvent) error
// BulkEventHandler is the handler used to handle the subscribed bulk event.
type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)
// SubscriptionHandlerConfig is the handler and configuration for subscription.
type SubscriptionHandlerConfig struct {
IsBulkSubscribe bool
SubscribeConfig pubsub.BulkSubscribeConfig
BulkHandler BulkEventHandler
Handler EventHandler
ValueSchemaType SchemaType
}
// NewEvent is an event arriving from a message bus instance.
type NewEvent struct {
Data []byte `json:"data"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"`
}
// KafkaBulkMessage is a bulk event arriving from a message bus instance.
type KafkaBulkMessage struct {
Entries []KafkaBulkMessageEntry `json:"entries"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
}
// KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
type KafkaBulkMessageEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event []byte `json:"event"`
ContentType string `json:"contentType,omitempty"`
Metadata map[string]string `json:"metadata"`
}