Config Per Topic and using maxAwaitDuration
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This commit is contained in:
parent
d31f4d99f5
commit
81898d5ee5
|
@ -36,17 +36,11 @@ type consumer struct {
|
|||
|
||||
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context())
|
||||
var messages []*sarama.ConsumerMessage
|
||||
messages = make([]*sarama.ConsumerMessage, 0)
|
||||
isBulkSubscribe := consumer.k.checkBulkSubscribe(claim.Topic())
|
||||
for message := range claim.Messages() {
|
||||
if isBulkSubscribe {
|
||||
if len(messages) < consumer.k.bulkSubscribeConfig.MaxBulkCount {
|
||||
messages = append(messages, message)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
if isBulkSubscribe {
|
||||
return consumer.processBulkMessages(session.Context(), session, claim, b)
|
||||
} else {
|
||||
for message := range claim.Messages() {
|
||||
if consumer.k.consumeRetryEnabled {
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
return consumer.doCallback(session, message)
|
||||
|
@ -65,10 +59,42 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
|
|||
}
|
||||
}
|
||||
}
|
||||
if isBulkSubscribe {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (consumer *consumer) processBulkMessages(ctx context.Context, session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim,
|
||||
b backoff.BackOff) error {
|
||||
handlerConfig, err := consumer.k.GetTopicBulkHandlerConfig(claim.Topic())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ticker := time.NewTicker(time.Duration(handlerConfig.SubscribeConfig.MaxBulkAwaitDurationMilliSeconds) * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
messages := make([]*sarama.ConsumerMessage, 0, handlerConfig.SubscribeConfig.MaxBulkCount)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return consumer.flushBulkMessages(claim, messages, session, handlerConfig.Handler, b)
|
||||
case message := <-(claim.Messages()):
|
||||
if message != nil {
|
||||
messages = append(messages, message)
|
||||
if len(messages) >= handlerConfig.SubscribeConfig.MaxBulkCount {
|
||||
return consumer.flushBulkMessages(claim, messages, session, handlerConfig.Handler, b)
|
||||
}
|
||||
}
|
||||
case <-ticker.C:
|
||||
return consumer.flushBulkMessages(claim, messages, session, handlerConfig.Handler, b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (consumer *consumer) flushBulkMessages(claim sarama.ConsumerGroupClaim,
|
||||
messages []*sarama.ConsumerMessage, session sarama.ConsumerGroupSession,
|
||||
handler BulkEventHandler, b backoff.BackOff) error {
|
||||
if len(messages) > 0 {
|
||||
if consumer.k.consumeRetryEnabled {
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
return consumer.doBulkCallback(session, messages, claim.Topic())
|
||||
return consumer.doBulkCallback(session, messages, handler, claim.Topic())
|
||||
}, b, func(err error, d time.Duration) {
|
||||
consumer.k.logger.Warnf("Error processing Kafka bulk messages: %s. Error: %v. Retrying...", claim.Topic(), err)
|
||||
}, func() {
|
||||
|
@ -77,21 +103,19 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
|
|||
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s. Error: %v.", claim.Topic(), err)
|
||||
}
|
||||
} else {
|
||||
err := consumer.doBulkCallback(session, messages, claim.Topic())
|
||||
err := consumer.doBulkCallback(session, messages, handler, claim.Topic())
|
||||
if err != nil {
|
||||
consumer.k.logger.Errorf("Error processing Kafka message: %s. Error: %v.", claim.Topic(), err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession, messages []*sarama.ConsumerMessage, topic string) error {
|
||||
func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
|
||||
messages []*sarama.ConsumerMessage, handler BulkEventHandler, topic string) error {
|
||||
consumer.k.logger.Debugf("Processing Kafka bulk message: %s", topic)
|
||||
handler, err := consumer.k.GetTopicBulkHandler(topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
messageValues := make([]KafkaBulkMessageEntry, (len(messages)))
|
||||
|
||||
for i, message := range messages {
|
||||
|
@ -104,7 +128,7 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession, me
|
|||
}
|
||||
entryID, entryIDErr := uuid.NewRandom()
|
||||
if entryIDErr != nil {
|
||||
consumer.k.logger.Errorf("Failed to generate entryID for sending message for bulk subscribe event on topic: %s. Error: %v.", topic, err)
|
||||
consumer.k.logger.Errorf("Failed to generate entryID for sending message for bulk subscribe event on topic: %s. Error: %v.", topic, entryIDErr)
|
||||
}
|
||||
childMessage := KafkaBulkMessageEntry{
|
||||
EntryID: entryID.String(),
|
||||
|
@ -184,17 +208,17 @@ func (k *Kafka) RemoveTopicHandler(topic string) {
|
|||
k.subscribeLock.Unlock()
|
||||
}
|
||||
|
||||
// AddBulkSubscribeConfig adds bulk subscribe config
|
||||
func (k *Kafka) AddBulkSubscribeConfig(maxBulkCount int, maxBulkAwaitDurationMilliSeconds int, maxBulkSizeBytes int) {
|
||||
k.bulkSubscribeConfig.MaxBulkCount = maxBulkCount
|
||||
k.bulkSubscribeConfig.MaxBulkAwaitDurationMilliSeconds = maxBulkAwaitDurationMilliSeconds
|
||||
k.bulkSubscribeConfig.MaxBulkSizeBytes = maxBulkSizeBytes
|
||||
}
|
||||
// // AddBulkSubscribeConfig adds bulk subscribe config
|
||||
// func (k *Kafka) AddBulkSubscribeConfig(maxBulkCount int, maxBulkAwaitDurationMilliSeconds int, maxBulkSizeBytes int) {
|
||||
// k.bulkSubscribeConfig.MaxBulkCount = maxBulkCount
|
||||
// k.bulkSubscribeConfig.MaxBulkAwaitDurationMilliSeconds = maxBulkAwaitDurationMilliSeconds
|
||||
// k.bulkSubscribeConfig.MaxBulkSizeBytes = maxBulkSizeBytes
|
||||
// }
|
||||
|
||||
// AddTopicBulkHandler adds a bulk handler for a topic
|
||||
func (k *Kafka) AddTopicBulkHandler(topic string, handler BulkEventHandler) {
|
||||
// AddTopicBulkHandler adds a bulk handler and configuration for a topic
|
||||
func (k *Kafka) AddTopicBulkHandler(topic string, handlerConfig BulkSubscriptionHandlerConfig) {
|
||||
k.bulkSubscribeLock.Lock()
|
||||
k.bulkSubscribeTopics[topic] = handler
|
||||
k.bulkSubscribeTopics[topic] = handlerConfig
|
||||
k.bulkSubscribeLock.Unlock()
|
||||
}
|
||||
|
||||
|
@ -211,7 +235,7 @@ func (k *Kafka) checkBulkSubscribe(topic string) bool {
|
|||
if !ok {
|
||||
return false
|
||||
}
|
||||
if topicBulkHandler != nil {
|
||||
if topicBulkHandler.Handler != nil {
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
|
@ -229,12 +253,13 @@ func (k *Kafka) GetTopicHandler(topic string) (EventHandler, error) {
|
|||
}
|
||||
|
||||
// GetTopicBulkHandler returns the bulk handler for a topic
|
||||
func (k *Kafka) GetTopicBulkHandler(topic string) (BulkEventHandler, error) {
|
||||
handler, ok := k.bulkSubscribeTopics[topic]
|
||||
if !ok || handler == nil {
|
||||
return nil, fmt.Errorf("bulk handler for messages of topic %s not found", topic)
|
||||
func (k *Kafka) GetTopicBulkHandlerConfig(topic string) (BulkSubscriptionHandlerConfig, error) {
|
||||
handlerConfig, ok := k.bulkSubscribeTopics[topic]
|
||||
if !ok || handlerConfig.Handler == nil {
|
||||
return BulkSubscriptionHandlerConfig{},
|
||||
fmt.Errorf("bulk handler for messages of topic %s not found", topic)
|
||||
}
|
||||
return handler, nil
|
||||
return handlerConfig, nil
|
||||
}
|
||||
|
||||
// Subscribe to topic in the Kafka cluster, in a background goroutine
|
||||
|
|
|
@ -40,7 +40,7 @@ type Kafka struct {
|
|||
consumer consumer
|
||||
config *sarama.Config
|
||||
subscribeTopics TopicHandlers
|
||||
bulkSubscribeTopics TopicBulkHandlers
|
||||
bulkSubscribeTopics TopicBulkHandlerConfig
|
||||
subscribeLock sync.Mutex
|
||||
bulkSubscribeLock sync.Mutex
|
||||
|
||||
|
@ -51,14 +51,13 @@ type Kafka struct {
|
|||
DefaultConsumeRetryEnabled bool
|
||||
consumeRetryEnabled bool
|
||||
consumeRetryInterval time.Duration
|
||||
bulkSubscribeConfig pubsub.BulkSubscribeConfig
|
||||
}
|
||||
|
||||
func NewKafka(logger logger.Logger) *Kafka {
|
||||
return &Kafka{
|
||||
logger: logger,
|
||||
subscribeTopics: make(TopicHandlers),
|
||||
bulkSubscribeTopics: make(TopicBulkHandlers),
|
||||
bulkSubscribeTopics: make(TopicBulkHandlerConfig),
|
||||
subscribeLock: sync.Mutex{},
|
||||
bulkSubscribeLock: sync.Mutex{},
|
||||
}
|
||||
|
@ -84,9 +83,6 @@ func (k *Kafka) Init(metadata map[string]string) error {
|
|||
config := sarama.NewConfig()
|
||||
config.Version = meta.Version
|
||||
config.Consumer.Offsets.Initial = k.initialOffset
|
||||
if meta.MinFetchBytes > 0 {
|
||||
config.Consumer.Fetch.Min = meta.MinFetchBytes
|
||||
}
|
||||
|
||||
if meta.ClientID != "" {
|
||||
config.ClientID = meta.ClientID
|
||||
|
@ -158,6 +154,12 @@ type EventHandler func(ctx context.Context, msg *NewEvent) error
|
|||
// BulkEventHandler is the handler used to handle the subscribed event.
|
||||
type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)
|
||||
|
||||
// BulkHandlerConfig is the bulkHandler and configuration for bulk subscription.
|
||||
type BulkSubscriptionHandlerConfig struct {
|
||||
SubscribeConfig pubsub.BulkSubscribeConfig
|
||||
Handler BulkEventHandler
|
||||
}
|
||||
|
||||
// NewEvent is an event arriving from a message bus instance.
|
||||
type NewEvent struct {
|
||||
Data []byte `json:"data"`
|
||||
|
|
|
@ -60,7 +60,6 @@ type kafkaMetadata struct {
|
|||
ConsumeRetryEnabled bool
|
||||
ConsumeRetryInterval time.Duration
|
||||
Version sarama.KafkaVersion
|
||||
MinFetchBytes int32
|
||||
}
|
||||
|
||||
// upgradeMetadata updates metadata properties based on deprecated usage.
|
||||
|
@ -140,13 +139,6 @@ func (k *Kafka) getKafkaMetadata(metadata map[string]string) (*kafkaMetadata, er
|
|||
|
||||
k.logger.Debugf("Found brokers: %v", meta.Brokers)
|
||||
|
||||
if val, ok := metadata["minFetchBytes"]; ok && val != "" {
|
||||
val64, _ := strconv.ParseInt(val, 10, 32)
|
||||
meta.MinFetchBytes = int32(val64)
|
||||
} else {
|
||||
meta.MinFetchBytes = 0
|
||||
}
|
||||
|
||||
val, ok := metadata["authType"]
|
||||
if !ok {
|
||||
return nil, errors.New("kafka error: missing 'authType' attribute")
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
|
@ -55,8 +56,8 @@ func isValidPEM(val string) bool {
|
|||
// Map of topics and their handlers
|
||||
type TopicHandlers map[string]EventHandler
|
||||
|
||||
// Map of topics and their bulk handlers
|
||||
type TopicBulkHandlers map[string]BulkEventHandler
|
||||
// TopicBulkHandlerConfig is the map of topics and sruct containing bulk handler and their config.
|
||||
type TopicBulkHandlerConfig map[string]BulkSubscriptionHandlerConfig
|
||||
|
||||
// TopicList returns the list of topics
|
||||
func (th TopicHandlers) TopicList() []string {
|
||||
|
@ -69,7 +70,7 @@ func (th TopicHandlers) TopicList() []string {
|
|||
return topics
|
||||
}
|
||||
|
||||
func (tbh TopicBulkHandlers) TopicList() []string {
|
||||
func (tbh TopicBulkHandlerConfig) TopicList() []string {
|
||||
topics := make([]string, len(tbh))
|
||||
i := 0
|
||||
for topic := range tbh {
|
||||
|
@ -78,3 +79,14 @@ func (tbh TopicBulkHandlers) TopicList() []string {
|
|||
}
|
||||
return topics
|
||||
}
|
||||
|
||||
// GetIntFromMetadata returns an int value from metadata OR default value if key not found or if its
|
||||
// value not convertible to int.
|
||||
func GetIntFromMetadata(metadata map[string]string, key string, defaultValue int) int {
|
||||
if val, ok := metadata[key]; ok {
|
||||
if intVal, err := strconv.Atoi(val); err == nil {
|
||||
return intVal
|
||||
}
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ package kafka
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/dapr/kit/logger"
|
||||
|
||||
|
@ -28,9 +27,21 @@ type PubSub struct {
|
|||
logger logger.Logger
|
||||
subscribeCtx context.Context
|
||||
subscribeCancel context.CancelFunc
|
||||
pubsub.BulkSubscribeConfig
|
||||
}
|
||||
|
||||
const (
|
||||
// maxBulkCountKey is the key for the max bulk count in the metadata.
|
||||
maxBulkCountKey string = "maxBulkCount"
|
||||
// maxBulkAwaitDurationKey is the key for the max bulk await duration in the metadata.
|
||||
maxBulkAwaitDurationMilliSecondsKey string = "maxBulkAwaitDurationMilliSeconds"
|
||||
// defaultMaxBulkCount is the default max bulk count for kafka pubsub component
|
||||
// if the maxBulkCountKey is not set in the metadata.
|
||||
defaultMaxBulkCount = 80
|
||||
// defaultMaxBulkAwaitDurationMilliSeconds is the default max bulk await duration for kafka pubsub component
|
||||
// if the maxBulkAwaitDurationKey is not set in the metadata.
|
||||
defaultMaxBulkAwaitDurationMilliSeconds = 10000
|
||||
)
|
||||
|
||||
func (p *PubSub) Init(metadata pubsub.Metadata) error {
|
||||
p.subscribeCtx, p.subscribeCancel = context.WithCancel(context.Background())
|
||||
|
||||
|
@ -64,21 +75,19 @@ func (p *PubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
|
|||
return p.kafka.Subscribe(p.subscribeCtx)
|
||||
}
|
||||
|
||||
func (p *PubSub) BulkSubscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.BulkHandler) error {
|
||||
p.kafka.AddTopicBulkHandler(req.Topic, adaptBulkHandler(handler))
|
||||
maxBulkCount, err := strconv.Atoi(req.Metadata["maxBulkCount"])
|
||||
if err != nil {
|
||||
maxBulkCount = 20
|
||||
func (p *PubSub) BulkSubscribe(ctx context.Context, req pubsub.SubscribeRequest,
|
||||
handler pubsub.BulkHandler) error {
|
||||
subConfig := pubsub.BulkSubscribeConfig{
|
||||
MaxBulkCount: kafka.GetIntFromMetadata(req.Metadata, maxBulkCountKey,
|
||||
defaultMaxBulkCount),
|
||||
MaxBulkAwaitDurationMilliSeconds: kafka.GetIntFromMetadata(req.Metadata,
|
||||
maxBulkAwaitDurationMilliSecondsKey, defaultMaxBulkAwaitDurationMilliSeconds),
|
||||
}
|
||||
maxBulkAwaitDurationMilliSeconds, err := strconv.Atoi(req.Metadata["maxBulkAwaitDurationMilliSeconds"])
|
||||
if err != nil {
|
||||
maxBulkAwaitDurationMilliSeconds = 20
|
||||
handlerConfig := kafka.BulkSubscriptionHandlerConfig{
|
||||
SubscribeConfig: subConfig,
|
||||
Handler: adaptBulkHandler(handler),
|
||||
}
|
||||
maxBulkSizeBytes, err := strconv.Atoi(req.Metadata["maxBulkSizeBytes"])
|
||||
if err != nil {
|
||||
maxBulkSizeBytes = 20
|
||||
}
|
||||
p.kafka.AddBulkSubscribeConfig(maxBulkCount, maxBulkAwaitDurationMilliSeconds, maxBulkSizeBytes)
|
||||
p.kafka.AddTopicBulkHandler(req.Topic, handlerConfig)
|
||||
|
||||
go func() {
|
||||
// Wait for context cancelation
|
||||
|
|
Loading…
Reference in New Issue