Change marking message logic
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This commit is contained in:
parent
e483dfe334
commit
31a38e5aaa
|
|
@ -17,7 +17,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -118,20 +117,17 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession, me
|
|||
Entries: messageValues,
|
||||
}
|
||||
responses, err := handler(session.Context(), &event)
|
||||
sort.SliceStable(responses, func(i, j int) bool {
|
||||
return responses[i].EntryID < responses[j].EntryID
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
for i, resp := range responses {
|
||||
id, e := strconv.Atoi(resp.EntryID)
|
||||
if e != nil {
|
||||
id = i
|
||||
}
|
||||
if resp.Error == nil {
|
||||
session.MarkMessage(messages[id], "")
|
||||
if resp.EntryID == messageValues[i].EntryID {
|
||||
if resp.Error == nil {
|
||||
session.MarkMessage(messages[i], "")
|
||||
} else {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
break
|
||||
return errors.New("entry id mismatch while processing bulk messages")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Reference in New Issue