fix linter issues
Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
parent
ea0e9770d4
commit
e9e750bce6
|
@ -62,7 +62,8 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
|
|||
}
|
||||
|
||||
func (consumer *consumer) processBulkMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim,
|
||||
b backoff.BackOff) error {
|
||||
b backoff.BackOff,
|
||||
) error {
|
||||
handlerConfig, err := consumer.k.GetTopicBulkHandlerConfig(claim.Topic())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting bulk handler config for topic %s: %w", claim.Topic(), err)
|
||||
|
@ -89,7 +90,8 @@ func (consumer *consumer) processBulkMessages(session sarama.ConsumerGroupSessio
|
|||
|
||||
func (consumer *consumer) flushBulkMessages(claim sarama.ConsumerGroupClaim,
|
||||
messages []*sarama.ConsumerMessage, session sarama.ConsumerGroupSession,
|
||||
handler BulkEventHandler, b backoff.BackOff) error {
|
||||
handler BulkEventHandler, b backoff.BackOff,
|
||||
) error {
|
||||
if len(messages) > 0 {
|
||||
if consumer.k.consumeRetryEnabled {
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
|
@ -113,7 +115,8 @@ func (consumer *consumer) flushBulkMessages(claim sarama.ConsumerGroupClaim,
|
|||
}
|
||||
|
||||
func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
|
||||
messages []*sarama.ConsumerMessage, handler BulkEventHandler, topic string) error {
|
||||
messages []*sarama.ConsumerMessage, handler BulkEventHandler, topic string,
|
||||
) error {
|
||||
consumer.k.logger.Debugf("Processing Kafka bulk message: %s", topic)
|
||||
messageValues := make([]KafkaBulkMessageEntry, (len(messages)))
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ type KafkaBulkMessage struct {
|
|||
|
||||
// KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
|
||||
type KafkaBulkMessageEntry struct {
|
||||
EntryID string `json:entryID`
|
||||
EntryID string `json:"entryID"`
|
||||
Event []byte `json:"event"`
|
||||
ContentType string `json:"contentType,omitempty"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
|
|
|
@ -65,7 +65,8 @@ func (p *PubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
|
|||
}
|
||||
|
||||
func (p *PubSub) BulkSubscribe(ctx context.Context, req pubsub.SubscribeRequest,
|
||||
handler pubsub.BulkHandler) error {
|
||||
handler pubsub.BulkHandler,
|
||||
) error {
|
||||
subConfig := pubsub.BulkSubscribeConfig{
|
||||
MaxBulkCount: kafka.GetIntFromMetadata(req.Metadata, metadata.MaxBulkCountKey,
|
||||
kafka.DefaultMaxBulkCount),
|
||||
|
|
Loading…
Reference in New Issue