From c81eefcc466d292328ff64001ca569103131c065 Mon Sep 17 00:00:00 2001 From: Shubham Sharma Date: Fri, 16 Sep 2022 17:51:23 +0530 Subject: [PATCH] Add bulk publish for event hub Signed-off-by: Shubham Sharma --- pubsub/azure/eventhubs/eventhubs.go | 40 ++++++++++++++++++++++++ pubsub/azure/eventhubs/eventhubs_test.go | 13 ++++++++ 2 files changed, 53 insertions(+) diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index f2d986f71..d04a1ad34 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -57,6 +57,7 @@ const ( missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement" entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString" differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString" + maxBulkSizeInBytesTooLargeErrorTmpl = "error: maxBulkSizeInBytes exceeds the maximum allowed value of %d" // Event Hubs SystemProperties names for metadata passthrough. sysPropSequenceNumber = "x-opt-sequence-number" @@ -156,6 +157,7 @@ type azureEventHubsMetadata struct { PartitionCount int32 `json:"partitionCount,omitempty,string"` SubscriptionID string `json:"subscriptionID,omitempty"` ResourceGroupName string `json:"resourceGroupName,omitempty"` + MaxBulkSizeInBytes int `json:"maxBulkSizeInBytes,omitempty,string"` } // NewAzureEventHubs returns a new Azure Event hubs instance. @@ -183,6 +185,10 @@ func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, erro return &m, errors.New(bothConnectionStringNamespaceErrorMsg) } + if m.MaxBulkSizeInBytes > int(eventhub.DefaultMaxMessageSizeInBytes) { + return &m, fmt.Errorf(maxBulkSizeInBytesTooLargeErrorTmpl, eventhub.DefaultMaxMessageSizeInBytes) + } + return &m, nil } @@ -564,6 +570,40 @@ func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error { return nil } +// BulkPublish sends data to Azure Event Hubs in bulk. +func (aeh *AzureEventHubs) BulkPublish(req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) { + if _, ok := aeh.hubClients[req.Topic]; !ok { + if err := aeh.ensurePublisherClient(aeh.publishCtx, req.Topic); err != nil { + err = fmt.Errorf("error on establishing hub connection: %s", err) + return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err + } + } + + // Create a slice of events to send. + events := make([]*eventhub.Event, len(req.Entries)) + for i, entry := range req.Entries { + events[i] = &eventhub.Event{Data: entry.Event} + val, ok := entry.Metadata[partitionKeyMetadataKey] + if ok { + events[i].PartitionKey = &val + } + } + + // Configure options for sending events. + opts := []eventhub.BatchOption{} + if aeh.metadata.MaxBulkSizeInBytes > 0 { + opts = append(opts, eventhub.BatchWithMaxSizeInBytes(aeh.metadata.MaxBulkSizeInBytes)) + } + + // Send events. + err := aeh.hubClients[req.Topic].SendBatch(aeh.publishCtx, eventhub.NewEventBatchIterator(events...), opts...) + if err != nil { + return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err + } + + return pubsub.BulkPublishResponse{}, nil +} + // Subscribe receives data from Azure Event Hubs. func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { err := aeh.validateSubscriptionAttributes() diff --git a/pubsub/azure/eventhubs/eventhubs_test.go b/pubsub/azure/eventhubs/eventhubs_test.go index 1d46ca2ef..2f5484e53 100644 --- a/pubsub/azure/eventhubs/eventhubs_test.go +++ b/pubsub/azure/eventhubs/eventhubs_test.go @@ -17,6 +17,7 @@ import ( "fmt" "testing" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,6 +68,18 @@ func TestParseEventHubsMetadata(t *testing.T) { assert.Error(t, err) assert.Equal(t, missingConnectionStringNamespaceErrorMsg, err.Error()) }) + + t.Run("test maxBulkSizeInBytes limits", func(t *testing.T) { + val := fmt.Sprintf("%d", eventhub.DefaultMaxMessageSizeInBytes+1) + props := map[string]string{"connectionString": "fake", "maxBulkSizeInBytes": val} + + metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}} + _, err := parseEventHubsMetadata(metadata) + + expected := fmt.Sprintf(maxBulkSizeInBytesTooLargeErrorTmpl, eventhub.DefaultMaxMessageSizeInBytes) + assert.Error(t, err) + assert.Equal(t, expected, err.Error()) + }) } func TestValidateSubscriptionAttributes(t *testing.T) {