Add bulk publish for event hub

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>
This commit is contained in:
Shubham Sharma 2022-09-16 17:51:23 +05:30
parent 3f9c42c4b8
commit c81eefcc46
2 changed files with 53 additions and 0 deletions

View File

@ -57,6 +57,7 @@ const (
missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement"
entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString"
differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString"
maxBulkSizeInBytesTooLargeErrorTmpl = "error: maxBulkSizeInBytes exceeds the maximum allowed value of %d"
// Event Hubs SystemProperties names for metadata passthrough.
sysPropSequenceNumber = "x-opt-sequence-number"
@ -156,6 +157,7 @@ type azureEventHubsMetadata struct {
PartitionCount int32 `json:"partitionCount,omitempty,string"`
SubscriptionID string `json:"subscriptionID,omitempty"`
ResourceGroupName string `json:"resourceGroupName,omitempty"`
MaxBulkSizeInBytes int `json:"maxBulkSizeInBytes,omitempty,string"`
}
// NewAzureEventHubs returns a new Azure Event hubs instance.
@ -183,6 +185,10 @@ func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, erro
return &m, errors.New(bothConnectionStringNamespaceErrorMsg)
}
if m.MaxBulkSizeInBytes > int(eventhub.DefaultMaxMessageSizeInBytes) {
return &m, fmt.Errorf(maxBulkSizeInBytesTooLargeErrorTmpl, eventhub.DefaultMaxMessageSizeInBytes)
}
return &m, nil
}
@ -564,6 +570,40 @@ func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error {
return nil
}
// BulkPublish sends data to Azure Event Hubs in bulk.
func (aeh *AzureEventHubs) BulkPublish(req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(aeh.publishCtx, req.Topic); err != nil {
err = fmt.Errorf("error on establishing hub connection: %s", err)
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
}
// Create a slice of events to send.
events := make([]*eventhub.Event, len(req.Entries))
for i, entry := range req.Entries {
events[i] = &eventhub.Event{Data: entry.Event}
val, ok := entry.Metadata[partitionKeyMetadataKey]
if ok {
events[i].PartitionKey = &val
}
}
// Configure options for sending events.
opts := []eventhub.BatchOption{}
if aeh.metadata.MaxBulkSizeInBytes > 0 {
opts = append(opts, eventhub.BatchWithMaxSizeInBytes(aeh.metadata.MaxBulkSizeInBytes))
}
// Send events.
err := aeh.hubClients[req.Topic].SendBatch(aeh.publishCtx, eventhub.NewEventBatchIterator(events...), opts...)
if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
return pubsub.BulkPublishResponse{}, nil
}
// Subscribe receives data from Azure Event Hubs.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := aeh.validateSubscriptionAttributes()

View File

@ -17,6 +17,7 @@ import (
"fmt"
"testing"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -67,6 +68,18 @@ func TestParseEventHubsMetadata(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, missingConnectionStringNamespaceErrorMsg, err.Error())
})
t.Run("test maxBulkSizeInBytes limits", func(t *testing.T) {
val := fmt.Sprintf("%d", eventhub.DefaultMaxMessageSizeInBytes+1)
props := map[string]string{"connectionString": "fake", "maxBulkSizeInBytes": val}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
_, err := parseEventHubsMetadata(metadata)
expected := fmt.Sprintf(maxBulkSizeInBytesTooLargeErrorTmpl, eventhub.DefaultMaxMessageSizeInBytes)
assert.Error(t, err)
assert.Equal(t, expected, err.Error())
})
}
func TestValidateSubscriptionAttributes(t *testing.T) {