Address review comments

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-01-20 21:34:03 +00:00
parent d97dac3b98
commit 827c949c99
1 changed files with 10 additions and 8 deletions

View File

@ -145,10 +145,11 @@ func (aeh *AzureEventHubs) Publish(ctx context.Context, req *pubsub.PublishReque
// BulkPublish sends data to Azure Event Hubs in bulk.
func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
res := pubsub.BulkPublishResponse{}
var err error
if req.Topic == "" {
return res, errors.New("parameter 'topic' is required")
err = errors.New("parameter 'topic' is required")
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
// Batch options
@ -171,21 +172,22 @@ func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPubl
}
if val := entry.Metadata["partitionKey"]; val != "" {
if batchOpts.PartitionKey != nil && *batchOpts.PartitionKey != val {
return res, errors.New("cannot send messages to different partitions")
err = errors.New("cannot send messages to different partitions")
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
batchOpts.PartitionKey = &val
}
}
// Publish the message
err := aeh.doPublish(ctx, req.Topic, messages, batchOpts)
err = aeh.doPublish(ctx, req.Topic, messages, batchOpts)
if err != nil {
// Partial success is not supported by Azure Event Hubs.
// If an error occurs, all events are considered failed.
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
return res, nil
return pubsub.BulkPublishResponse{}, nil
}
// Internal method used by Publish and BulkPublish to send messages
@ -199,7 +201,7 @@ func (aeh *AzureEventHubs) doPublish(ctx context.Context, topic string, messages
// Build the batch of messages
batch, err := client.NewEventDataBatch(ctx, batchOpts)
if err != nil {
return fmt.Errorf("error creating batch: %w", err)
return fmt.Errorf("error creating event batch: %w", err)
}
// Add all messages
@ -357,7 +359,7 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str
for {
// TODO: Support setting a batch size
const batchSize = 1
ctx, cancel = context.WithCancel(subscribeCtx)
ctx, cancel = context.WithTimeout(subscribeCtx, time.Minute)
events, err = partitionClient.ReceiveEvents(ctx, batchSize, nil)
cancel()
@ -367,7 +369,7 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str
// We'll just stop this subscription and return
eventHubError := (*azeventhubs.Error)(nil)
if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost {
aeh.logger.Debug("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), topic)
aeh.logger.Debugf("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), topic)
return nil
}