From 55ad67d5104c333896b2ffb7935446c5f1e81401 Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Fri, 16 Sep 2022 21:44:37 +0530 Subject: [PATCH] When UUID could not be generated Signed-off-by: Deepanshu Agarwal --- internal/component/kafka/consumer.go | 31 ++++++++++++---------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/internal/component/kafka/consumer.go b/internal/component/kafka/consumer.go index 1933fce8e..a03319cbf 100644 --- a/internal/component/kafka/consumer.go +++ b/internal/component/kafka/consumer.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "time" @@ -126,12 +127,16 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession, metadata[string(t.Key)] = string(t.Value) } } + var entryIDStr string entryID, entryIDErr := uuid.NewRandom() if entryIDErr != nil { - consumer.k.logger.Errorf("Failed to generate entryID for sending message for bulk subscribe event on topic: %s. Error: %v.", topic, entryIDErr) + consumer.k.logger.Errorf("Failed to generate UUID for sending as entryID in message for bulk subscribe event on topic: %s. Error: %v.", topic, entryIDErr) + entryIDStr = strconv.Itoa(i) + } else { + entryIDStr = entryID.String() } childMessage := KafkaBulkMessageEntry{ - EntryID: entryID.String(), + EntryID: entryIDStr, Event: message.Value, Metadata: metadata, } @@ -208,13 +213,6 @@ func (k *Kafka) RemoveTopicHandler(topic string) { k.subscribeLock.Unlock() } -// // AddBulkSubscribeConfig adds bulk subscribe config -// func (k *Kafka) AddBulkSubscribeConfig(maxBulkCount int, maxBulkAwaitDurationMilliSeconds int, maxBulkSizeBytes int) { -// k.bulkSubscribeConfig.MaxBulkCount = maxBulkCount -// k.bulkSubscribeConfig.MaxBulkAwaitDurationMilliSeconds = maxBulkAwaitDurationMilliSeconds -// k.bulkSubscribeConfig.MaxBulkSizeBytes = maxBulkSizeBytes -// } - // AddTopicBulkHandler adds a bulk handler and configuration for a topic func (k *Kafka) AddTopicBulkHandler(topic string, handlerConfig BulkSubscriptionHandlerConfig) { k.bulkSubscribeLock.Lock() @@ -229,17 +227,14 @@ func (k *Kafka) RemoveTopicBulkHandler(topic string) { k.bulkSubscribeLock.Unlock() } -// checkBulkSubscribe checks if a bulk handler registered for provided topic +// checkBulkSubscribe checks if a bulk handler and config are correctly registered for provided topic func (k *Kafka) checkBulkSubscribe(topic string) bool { - topicBulkHandler, ok := k.bulkSubscribeTopics[topic] - if !ok { - return false - } - if topicBulkHandler.Handler != nil { + if bulkHandlerConfig, ok := k.bulkSubscribeTopics[topic]; ok && + bulkHandlerConfig.Handler != nil && (bulkHandlerConfig.SubscribeConfig.MaxBulkCount > 0) && + bulkHandlerConfig.SubscribeConfig.MaxBulkAwaitDurationMilliSeconds > 0 { return true - } else { - return false } + return false } // GetTopicHandler returns the handler for a topic @@ -376,7 +371,7 @@ func (k *Kafka) BulkSubscribe(ctx context.Context) error { } go func() { - k.logger.Debugf("Subscribed and listening to topics: %s", topics) + k.logger.Debugf("Bulk subscribed and listening to topics: %s", topics) for { // If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops