optimize bulk pub response to contain only failed entries

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
This commit is contained in:
Mukundan Sundararajan 2022-11-11 11:10:44 +05:30
parent a4b27ae49b
commit 72695529f6
8 changed files with 63 additions and 87 deletions

View File

@ -81,7 +81,7 @@ func (k *Kafka) Publish(topic string, data []byte, metadata map[string]string) e
func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) { func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) {
if k.producer == nil { if k.producer == nil {
err := errors.New("component is closed") err := errors.New("component is closed")
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(entries, err), err
} }
k.logger.Debugf("Bulk Publishing on topic %v", topic) k.logger.Debugf("Bulk Publishing on topic %v", topic)
@ -122,7 +122,7 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu
return k.mapKafkaProducerErrors(err, entries), err return k.mapKafkaProducerErrors(err, entries), err
} }
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishSucceeded, nil), nil return pubsub.BulkPublishResponse{}, nil
} }
// mapKafkaProducerErrors to correct response statuses // mapKafkaProducerErrors to correct response statuses
@ -131,10 +131,10 @@ func (k *Kafka) mapKafkaProducerErrors(err error, entries []pubsub.BulkMessageEn
if !errors.As(err, &pErrs) { if !errors.As(err, &pErrs) {
// Ideally this condition should not be executed, but in the scenario that the err is not of sarama.ProducerErrors type // Ideally this condition should not be executed, but in the scenario that the err is not of sarama.ProducerErrors type
// return a default error that all messages have failed // return a default error that all messages have failed
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err) return pubsub.NewBulkPublishResponse(entries, err)
} }
resp := pubsub.BulkPublishResponse{ resp := pubsub.BulkPublishResponse{
Statuses: make([]pubsub.BulkPublishResponseEntry, 0, len(entries)), FailedEntries: make([]pubsub.BulkPublishResponseEntry, 0, len(entries)),
} }
// used in the case of the partial success scenario // used in the case of the partial success scenario
alreadySeen := map[string]struct{}{} alreadySeen := map[string]struct{}{}
@ -142,8 +142,7 @@ func (k *Kafka) mapKafkaProducerErrors(err error, entries []pubsub.BulkMessageEn
for _, pErr := range pErrs { for _, pErr := range pErrs {
if entryId, ok := pErr.Msg.Metadata.(string); ok { //nolint:stylecheck if entryId, ok := pErr.Msg.Metadata.(string); ok { //nolint:stylecheck
alreadySeen[entryId] = struct{}{} alreadySeen[entryId] = struct{}{}
resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{ resp.FailedEntries = append(resp.FailedEntries, pubsub.BulkPublishResponseEntry{
Status: pubsub.PublishFailed,
EntryId: entryId, EntryId: entryId,
Error: pErr.Err, Error: pErr.Err,
}) })
@ -151,21 +150,7 @@ func (k *Kafka) mapKafkaProducerErrors(err error, entries []pubsub.BulkMessageEn
// Ideally this condition should not be executed, but in the scenario that the Metadata field // Ideally this condition should not be executed, but in the scenario that the Metadata field
// is not of string type return a default error that all messages have failed // is not of string type return a default error that all messages have failed
k.logger.Warnf("error parsing bulk errors from Kafka, returning default error response of all failed") k.logger.Warnf("error parsing bulk errors from Kafka, returning default error response of all failed")
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err) return pubsub.NewBulkPublishResponse(entries, err)
}
}
// Check if all the messages have failed
if len(pErrs) != len(entries) {
// This is a partial success scenario
for _, entry := range entries {
// Check if the entryId was not seen in the pErrs list
if _, ok := alreadySeen[entry.EntryId]; !ok {
// this is a message that has succeeded
resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{
Status: pubsub.PublishSucceeded,
EntryId: entry.EntryId,
})
}
} }
} }
return resp return resp

View File

@ -571,7 +571,7 @@ func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPubl
if _, ok := aeh.hubClients[req.Topic]; !ok { if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil { if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil {
err = fmt.Errorf("error on establishing hub connection: %s", err) err = fmt.Errorf("error on establishing hub connection: %s", err)
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
} }
@ -595,10 +595,10 @@ func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPubl
if err != nil { if err != nil {
// Partial success is not supported by Azure Event Hubs. // Partial success is not supported by Azure Event Hubs.
// If an error occurs, all events are considered failed. // If an error occurs, all events are considered failed.
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil return pubsub.BulkPublishResponse{}, nil
} }
// Subscribe receives data from Azure Event Hubs. // Subscribe receives data from Azure Event Hubs.

View File

@ -135,7 +135,7 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
// Return an empty response to avoid this. // Return an empty response to avoid this.
if len(req.Entries) == 0 { if len(req.Entries) == 0 {
a.logger.Warnf("Empty bulk publish request, skipping") a.logger.Warnf("Empty bulk publish request, skipping")
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil return pubsub.BulkPublishResponse{}, nil
} }
// Ensure the queue exists the first time it is referenced // Ensure the queue exists the first time it is referenced
@ -143,13 +143,13 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
// Note that the parameter is called "Topic" but we're publishing to a queue // Note that the parameter is called "Topic" but we're publishing to a queue
err := a.client.EnsureQueue(a.publishCtx, req.Topic) err := a.client.EnsureQueue(a.publishCtx, req.Topic)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Get the sender // Get the sender
sender, err := a.client.GetSender(ctx, req.Topic) sender, err := a.client.GetSender(ctx, req.Topic)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Create a new batch of messages with batch options. // Create a new batch of messages with batch options.
@ -159,22 +159,22 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
batchMsg, err := sender.NewMessageBatch(ctx, batchOpts) batchMsg, err := sender.NewMessageBatch(ctx, batchOpts)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Add messages from the bulk publish request to the batch. // Add messages from the bulk publish request to the batch.
err = impl.UpdateASBBatchMessageWithBulkPublishRequest(batchMsg, req) err = impl.UpdateASBBatchMessageWithBulkPublishRequest(batchMsg, req)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Azure Service Bus does not return individual status for each message in the request. // Azure Service Bus does not return individual status for each message in the request.
err = sender.SendMessageBatch(ctx, batchMsg, nil) err = sender.SendMessageBatch(ctx, batchMsg, nil)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil return pubsub.BulkPublishResponse{}, nil
} }
func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {

View File

@ -134,20 +134,20 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
// Return an empty response to avoid this. // Return an empty response to avoid this.
if len(req.Entries) == 0 { if len(req.Entries) == 0 {
a.logger.Warnf("Empty bulk publish request, skipping") a.logger.Warnf("Empty bulk publish request, skipping")
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil return pubsub.BulkPublishResponse{}, nil
} }
// Ensure the queue or topic exists the first time it is referenced // Ensure the queue or topic exists the first time it is referenced
// This does nothing if DisableEntityManagement is true // This does nothing if DisableEntityManagement is true
err := a.client.EnsureTopic(a.publishCtx, req.Topic) err := a.client.EnsureTopic(a.publishCtx, req.Topic)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Get the sender // Get the sender
sender, err := a.client.GetSender(ctx, req.Topic) sender, err := a.client.GetSender(ctx, req.Topic)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Create a new batch of messages with batch options. // Create a new batch of messages with batch options.
@ -157,22 +157,22 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
batchMsg, err := sender.NewMessageBatch(ctx, batchOpts) batchMsg, err := sender.NewMessageBatch(ctx, batchOpts)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Add messages from the bulk publish request to the batch. // Add messages from the bulk publish request to the batch.
err = impl.UpdateASBBatchMessageWithBulkPublishRequest(batchMsg, req) err = impl.UpdateASBBatchMessageWithBulkPublishRequest(batchMsg, req)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
// Azure Service Bus does not return individual status for each message in the request. // Azure Service Bus does not return individual status for each message in the request.
err = sender.SendMessageBatch(ctx, batchMsg, nil) err = sender.SendMessageBatch(ctx, batchMsg, nil)
if err != nil { if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err return pubsub.NewBulkPublishResponse(req.Entries, err), err
} }
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil return pubsub.BulkPublishResponse{}, nil
} }
func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {

View File

@ -16,9 +16,6 @@ package pubsub
// AppResponseStatus represents a status of a PubSub response. // AppResponseStatus represents a status of a PubSub response.
type AppResponseStatus string type AppResponseStatus string
// BulkPublishStatus represents a status of a Bulk Publish response.
type BulkPublishStatus string
const ( const (
// Success means the message is received and processed correctly. // Success means the message is received and processed correctly.
Success AppResponseStatus = "SUCCESS" Success AppResponseStatus = "SUCCESS"
@ -26,10 +23,6 @@ const (
Retry AppResponseStatus = "RETRY" Retry AppResponseStatus = "RETRY"
// Drop means the message is received but should not be processed. // Drop means the message is received but should not be processed.
Drop AppResponseStatus = "DROP" Drop AppResponseStatus = "DROP"
// PublishSucceeded represents that message was published successfully.
PublishSucceeded BulkPublishStatus = "SUCCESS"
// PublishFailed represents that message publishing failed.
PublishFailed BulkPublishStatus = "FAILED"
) )
// AppResponse is the object describing the response from user code after a pubsub event. // AppResponse is the object describing the response from user code after a pubsub event.
@ -53,13 +46,12 @@ type AppBulkResponse struct {
// to be sent to publishing App for the corresponding single message during bulk publish // to be sent to publishing App for the corresponding single message during bulk publish
type BulkPublishResponseEntry struct { type BulkPublishResponseEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck EntryId string `json:"entryId"` //nolint:stylecheck
Status BulkPublishStatus `json:"status"`
Error error `json:"error"` Error error `json:"error"`
} }
// BulkPublishResponse is the whole bulk publish response sent to App // BulkPublishResponse contains the list of failed entries in a bulk publish request.
type BulkPublishResponse struct { type BulkPublishResponse struct {
Statuses []BulkPublishResponseEntry `json:"statuses"` FailedEntries []BulkPublishResponseEntry `json:"failedEntries"`
} }
// BulkSubscribeResponseEntry Represents single subscribe response item, as part of BulkSubscribeResponse // BulkSubscribeResponseEntry Represents single subscribe response item, as part of BulkSubscribeResponse
@ -75,19 +67,18 @@ type BulkSubscribeResponse struct {
Statuses []BulkSubscribeResponseEntry `json:"statuses"` Statuses []BulkSubscribeResponseEntry `json:"statuses"`
} }
// NewBulkPublishResponse returns a BulkPublishResponse with each entry having same status and error. // NewBulkPublishResponse returns a BulkPublishResponse with each entry having same error.
// This method is a helper method to map a single error/success response on BulkPublish to multiple events. // This method is a helper method to map a single error response on BulkPublish to multiple events.
func NewBulkPublishResponse(messages []BulkMessageEntry, status BulkPublishStatus, err error) BulkPublishResponse { func NewBulkPublishResponse(messages []BulkMessageEntry, err error) BulkPublishResponse {
response := BulkPublishResponse{} response := BulkPublishResponse{}
response.Statuses = make([]BulkPublishResponseEntry, len(messages)) response.FailedEntries = make([]BulkPublishResponseEntry, 0, len(messages))
for i, msg := range messages { for _, msg := range messages {
st := BulkPublishResponseEntry{} en := BulkPublishResponseEntry{}
st.EntryId = msg.EntryId en.EntryId = msg.EntryId
st.Status = status
if err != nil { if err != nil {
st.Error = err en.Error = err
} }
response.Statuses[i] = st response.FailedEntries = append(response.FailedEntries, en)
} }
return response return response
} }

View File

@ -38,42 +38,22 @@ func TestNewBulkPublishResponse(t *testing.T) {
ContentType: "text/plain", ContentType: "text/plain",
}, },
} }
t.Run("populate success", func(t *testing.T) {
res := NewBulkPublishResponse(messages, PublishSucceeded, nil)
assert.NotEmpty(t, res, "expected res to be populated")
assert.Equal(t, 2, len(res.Statuses), "expected two statuses")
expectedRes := BulkPublishResponse{
Statuses: []BulkPublishResponseEntry{
{
EntryId: "1",
Status: PublishSucceeded,
},
{
EntryId: "2",
Status: PublishSucceeded,
},
},
}
assert.ElementsMatch(t, expectedRes.Statuses, res.Statuses, "expected output to match")
})
t.Run("populate failure", func(t *testing.T) { t.Run("populate failure", func(t *testing.T) {
res := NewBulkPublishResponse(messages, PublishFailed, assert.AnError) res := NewBulkPublishResponse(messages, assert.AnError)
assert.NotEmpty(t, res, "expected res to be populated") assert.NotEmpty(t, res, "expected res to be populated")
assert.Equal(t, 2, len(res.Statuses), "expected two statuses") assert.Equal(t, 2, len(res.FailedEntries), "expected two statuses")
expectedRes := BulkPublishResponse{ expectedRes := BulkPublishResponse{
Statuses: []BulkPublishResponseEntry{ FailedEntries: []BulkPublishResponseEntry{
{ {
EntryId: "1", EntryId: "1",
Status: PublishFailed,
Error: assert.AnError, Error: assert.AnError,
}, },
{ {
EntryId: "2", EntryId: "2",
Status: PublishFailed,
Error: assert.AnError, Error: assert.AnError,
}, },
}, },
} }
assert.ElementsMatch(t, expectedRes.Statuses, res.Statuses, "expected output to match") assert.ElementsMatch(t, expectedRes.FailedEntries, res.FailedEntries, "expected output to match")
}) })
} }

View File

@ -386,11 +386,13 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
} }
t.Logf("Calling Bulk Publish on component %s", config.ComponentName) t.Logf("Calling Bulk Publish on component %s", config.ComponentName)
// Making use of entryMap defined above here to iterate through entryIds of messages published.
res, err := bP.BulkPublish(context.Background(), &req) res, err := bP.BulkPublish(context.Background(), &req)
faileEntries := convertBulkPublishResponseToStringSlice(res)
if err == nil { if err == nil {
for _, status := range res.Statuses { for k := range entryMap {
if status.Status == pubsub.PublishSucceeded { if !utils.Contains(faileEntries, k) {
data := entryMap[status.EntryId] data := entryMap[k]
t.Logf("adding to awaited messages %s", data) t.Logf("adding to awaited messages %s", data)
awaitingMessages[string(data)] = struct{}{} awaitingMessages[string(data)] = struct{}{}
} }
@ -618,3 +620,11 @@ func createMultiSubscriber(t *testing.T, subscribeCtx context.Context, ch chan<-
}) })
require.NoError(t, err, "expected no error on subscribe") require.NoError(t, err, "expected no error on subscribe")
} }
func convertBulkPublishResponseToStringSlice(res pubsub.BulkPublishResponse) []string {
failedEntries := make([]string, 0, len(res.FailedEntries))
for _, failedEntry := range res.FailedEntries {
failedEntries = append(failedEntries, failedEntry.EntryId)
}
return failedEntries
}

View File

@ -134,3 +134,13 @@ func NewStringSet(values ...string) map[string]struct{} {
return set return set
} }
func Contains[V comparable](arr []V, str V) bool {
for _, a := range arr {
if a == str {
return true
}
}
return false
}