properly parse response errors from kafka on sending multiple messages in bulk publish

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
This commit is contained in:
Mukundan Sundararajan 2022-09-22 16:51:34 +05:30
parent db0ace71bc
commit 4354f036e4
1 changed files with 58 additions and 1 deletions

View File

@ -91,6 +91,16 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu
Topic: topic,
Value: sarama.ByteEncoder(entry.Event),
}
// From Sarama documentation
// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
// pass-through data.
// This pass thorugh field is used for mapping errors, as seen in the mapKafkaProducerErrors method
// The EntryID will be unique for this request and the ProducerMessage is returned on the Errros channel,
// the metadata in that field is compared to the entry metadata to generate the right response on partial failures
msg.Metadata = entry.EntryID
for name, value := range metadata {
if name == key {
msg.Key = sarama.StringEncoder(value)
@ -108,8 +118,55 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu
}
if err := k.producer.SendMessages(msgs); err != nil {
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err), err
// map the returned error to different entries
return k.mapKafkaProducerErrors(err, entries), err
}
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishSucceeded, nil), nil
}
// mapKafkaProducerErrors to correct response statuses
func (k *Kafka) mapKafkaProducerErrors(err error, entries []pubsub.BulkMessageEntry) pubsub.BulkPublishResponse {
var pErrs sarama.ProducerErrors
if !errors.As(err, &pErrs) {
// Ideally this condition should not be executed, but in the scenario that the err is not of sarama.ProducerErrors type
// return a default error that all messages have failed
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err)
}
resp := pubsub.BulkPublishResponse{
Statuses: make([]pubsub.BulkPublishResponseEntry, 0, len(entries)),
}
// used in the case of the partial success scenario
alreadySeen := map[string]struct{}{}
for _, pErr := range pErrs {
if entryID, ok := pErr.Msg.Metadata.(string); ok {
alreadySeen[entryID] = struct{}{}
resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{
Status: pubsub.PublishFailed,
EntryID: entryID,
Error: pErr.Err,
})
} else {
// Ideally this condition should not be executed, but in the scenario that the Metadata field
// is not of string type return a default error that all messages have failed
k.logger.Warnf("error parsing bulk errors from Kafka, returning default error response of all failed")
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err)
}
}
// Check if all the messages have failed
if len(pErrs) != len(entries) {
// This is a partial success scenario
for _, entry := range entries {
// Check if the entryID was not seen in the pErrs list
if _, ok := alreadySeen[entry.EntryID]; !ok {
// this is a message that has succeeded
resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{
Status: pubsub.PublishSucceeded,
EntryID: entry.EntryID,
})
}
}
}
return resp
}