When UUID could not be generated

Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This commit is contained in:
Deepanshu Agarwal 2022-09-16 21:44:37 +05:30
parent 2f70a0529b
commit 55ad67d510
1 changed files with 13 additions and 18 deletions

View File

@ -17,6 +17,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
@ -126,12 +127,16 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
metadata[string(t.Key)] = string(t.Value) metadata[string(t.Key)] = string(t.Value)
} }
} }
var entryIDStr string
entryID, entryIDErr := uuid.NewRandom() entryID, entryIDErr := uuid.NewRandom()
if entryIDErr != nil { if entryIDErr != nil {
consumer.k.logger.Errorf("Failed to generate entryID for sending message for bulk subscribe event on topic: %s. Error: %v.", topic, entryIDErr) consumer.k.logger.Errorf("Failed to generate UUID for sending as entryID in message for bulk subscribe event on topic: %s. Error: %v.", topic, entryIDErr)
entryIDStr = strconv.Itoa(i)
} else {
entryIDStr = entryID.String()
} }
childMessage := KafkaBulkMessageEntry{ childMessage := KafkaBulkMessageEntry{
EntryID: entryID.String(), EntryID: entryIDStr,
Event: message.Value, Event: message.Value,
Metadata: metadata, Metadata: metadata,
} }
@ -208,13 +213,6 @@ func (k *Kafka) RemoveTopicHandler(topic string) {
k.subscribeLock.Unlock() k.subscribeLock.Unlock()
} }
// // AddBulkSubscribeConfig adds bulk subscribe config
// func (k *Kafka) AddBulkSubscribeConfig(maxBulkCount int, maxBulkAwaitDurationMilliSeconds int, maxBulkSizeBytes int) {
// k.bulkSubscribeConfig.MaxBulkCount = maxBulkCount
// k.bulkSubscribeConfig.MaxBulkAwaitDurationMilliSeconds = maxBulkAwaitDurationMilliSeconds
// k.bulkSubscribeConfig.MaxBulkSizeBytes = maxBulkSizeBytes
// }
// AddTopicBulkHandler adds a bulk handler and configuration for a topic // AddTopicBulkHandler adds a bulk handler and configuration for a topic
func (k *Kafka) AddTopicBulkHandler(topic string, handlerConfig BulkSubscriptionHandlerConfig) { func (k *Kafka) AddTopicBulkHandler(topic string, handlerConfig BulkSubscriptionHandlerConfig) {
k.bulkSubscribeLock.Lock() k.bulkSubscribeLock.Lock()
@ -229,17 +227,14 @@ func (k *Kafka) RemoveTopicBulkHandler(topic string) {
k.bulkSubscribeLock.Unlock() k.bulkSubscribeLock.Unlock()
} }
// checkBulkSubscribe checks if a bulk handler registered for provided topic // checkBulkSubscribe checks if a bulk handler and config are correctly registered for provided topic
func (k *Kafka) checkBulkSubscribe(topic string) bool { func (k *Kafka) checkBulkSubscribe(topic string) bool {
topicBulkHandler, ok := k.bulkSubscribeTopics[topic] if bulkHandlerConfig, ok := k.bulkSubscribeTopics[topic]; ok &&
if !ok { bulkHandlerConfig.Handler != nil && (bulkHandlerConfig.SubscribeConfig.MaxBulkCount > 0) &&
return false bulkHandlerConfig.SubscribeConfig.MaxBulkAwaitDurationMilliSeconds > 0 {
}
if topicBulkHandler.Handler != nil {
return true return true
} else {
return false
} }
return false
} }
// GetTopicHandler returns the handler for a topic // GetTopicHandler returns the handler for a topic
@ -376,7 +371,7 @@ func (k *Kafka) BulkSubscribe(ctx context.Context) error {
} }
go func() { go func() {
k.logger.Debugf("Subscribed and listening to topics: %s", topics) k.logger.Debugf("Bulk subscribed and listening to topics: %s", topics)
for { for {
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops // If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops