From 4354f036e40bdbccc3397c38437dc68f85c4c8f7 Mon Sep 17 00:00:00 2001 From: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> Date: Thu, 22 Sep 2022 16:51:34 +0530 Subject: [PATCH] properly parse response errors from kafka on sending multiple messages in bulk publish Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> --- internal/component/kafka/producer.go | 59 +++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/internal/component/kafka/producer.go b/internal/component/kafka/producer.go index 68fa5fb61..25603d167 100644 --- a/internal/component/kafka/producer.go +++ b/internal/component/kafka/producer.go @@ -91,6 +91,16 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu Topic: topic, Value: sarama.ByteEncoder(entry.Event), } + // From Sarama documentation + // This field is used to hold arbitrary data you wish to include so it + // will be available when receiving on the Successes and Errors channels. + // Sarama completely ignores this field and is only to be used for + // pass-through data. + // This pass thorugh field is used for mapping errors, as seen in the mapKafkaProducerErrors method + // The EntryID will be unique for this request and the ProducerMessage is returned on the Errros channel, + // the metadata in that field is compared to the entry metadata to generate the right response on partial failures + msg.Metadata = entry.EntryID + for name, value := range metadata { if name == key { msg.Key = sarama.StringEncoder(value) @@ -108,8 +118,55 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu } if err := k.producer.SendMessages(msgs); err != nil { - return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err), err + // map the returned error to different entries + return k.mapKafkaProducerErrors(err, entries), err } return pubsub.NewBulkPublishResponse(entries, pubsub.PublishSucceeded, nil), nil } + +// mapKafkaProducerErrors to correct response statuses +func (k *Kafka) mapKafkaProducerErrors(err error, entries []pubsub.BulkMessageEntry) pubsub.BulkPublishResponse { + var pErrs sarama.ProducerErrors + if !errors.As(err, &pErrs) { + // Ideally this condition should not be executed, but in the scenario that the err is not of sarama.ProducerErrors type + // return a default error that all messages have failed + return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err) + } + resp := pubsub.BulkPublishResponse{ + Statuses: make([]pubsub.BulkPublishResponseEntry, 0, len(entries)), + } + // used in the case of the partial success scenario + alreadySeen := map[string]struct{}{} + + for _, pErr := range pErrs { + if entryID, ok := pErr.Msg.Metadata.(string); ok { + alreadySeen[entryID] = struct{}{} + resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{ + Status: pubsub.PublishFailed, + EntryID: entryID, + Error: pErr.Err, + }) + } else { + // Ideally this condition should not be executed, but in the scenario that the Metadata field + // is not of string type return a default error that all messages have failed + k.logger.Warnf("error parsing bulk errors from Kafka, returning default error response of all failed") + return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err) + } + } + // Check if all the messages have failed + if len(pErrs) != len(entries) { + // This is a partial success scenario + for _, entry := range entries { + // Check if the entryID was not seen in the pErrs list + if _, ok := alreadySeen[entry.EntryID]; !ok { + // this is a message that has succeeded + resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{ + Status: pubsub.PublishSucceeded, + EntryID: entry.EntryID, + }) + } + } + } + return resp +}