From 827c949c999759181e7541dd5112199b8fe73130 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 20 Jan 2023 21:34:03 +0000 Subject: [PATCH] Address review comments Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pubsub/azure/eventhubs/eventhubs.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index 312de4907..ab68b7fbd 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -145,10 +145,11 @@ func (aeh *AzureEventHubs) Publish(ctx context.Context, req *pubsub.PublishReque // BulkPublish sends data to Azure Event Hubs in bulk. func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) { - res := pubsub.BulkPublishResponse{} + var err error if req.Topic == "" { - return res, errors.New("parameter 'topic' is required") + err = errors.New("parameter 'topic' is required") + return pubsub.NewBulkPublishResponse(req.Entries, err), err } // Batch options @@ -171,21 +172,22 @@ func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPubl } if val := entry.Metadata["partitionKey"]; val != "" { if batchOpts.PartitionKey != nil && *batchOpts.PartitionKey != val { - return res, errors.New("cannot send messages to different partitions") + err = errors.New("cannot send messages to different partitions") + return pubsub.NewBulkPublishResponse(req.Entries, err), err } batchOpts.PartitionKey = &val } } // Publish the message - err := aeh.doPublish(ctx, req.Topic, messages, batchOpts) + err = aeh.doPublish(ctx, req.Topic, messages, batchOpts) if err != nil { // Partial success is not supported by Azure Event Hubs. // If an error occurs, all events are considered failed. return pubsub.NewBulkPublishResponse(req.Entries, err), err } - return res, nil + return pubsub.BulkPublishResponse{}, nil } // Internal method used by Publish and BulkPublish to send messages @@ -199,7 +201,7 @@ func (aeh *AzureEventHubs) doPublish(ctx context.Context, topic string, messages // Build the batch of messages batch, err := client.NewEventDataBatch(ctx, batchOpts) if err != nil { - return fmt.Errorf("error creating batch: %w", err) + return fmt.Errorf("error creating event batch: %w", err) } // Add all messages @@ -357,7 +359,7 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str for { // TODO: Support setting a batch size const batchSize = 1 - ctx, cancel = context.WithCancel(subscribeCtx) + ctx, cancel = context.WithTimeout(subscribeCtx, time.Minute) events, err = partitionClient.ReceiveEvents(ctx, batchSize, nil) cancel() @@ -367,7 +369,7 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str // We'll just stop this subscription and return eventHubError := (*azeventhubs.Error)(nil) if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost { - aeh.logger.Debug("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), topic) + aeh.logger.Debugf("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), topic) return nil }