From cf077cca65694b53088dc726603d625f28135faa Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Tue, 10 Mar 2020 10:37:52 +0530 Subject: [PATCH] Updated Event Hubs bindings implementation to use EPH (#253) * update implementation to use EPH * fixed minor lint error Co-authored-by: Yaron Schneider --- bindings/azure/eventhubs/eventhubs.go | 140 +++++++++++++-------- bindings/azure/eventhubs/eventhubs_test.go | 65 ++++++++-- 2 files changed, 145 insertions(+), 60 deletions(-) diff --git a/bindings/azure/eventhubs/eventhubs.go b/bindings/azure/eventhubs/eventhubs.go index 86d7b2c4d..7f46c4c75 100644 --- a/bindings/azure/eventhubs/eventhubs.go +++ b/bindings/azure/eventhubs/eventhubs.go @@ -7,17 +7,39 @@ package eventhubs import ( "context" - "encoding/json" + "errors" + "fmt" "os" "os/signal" "syscall" - "time" eventhub "github.com/Azure/azure-event-hubs-go" + "github.com/Azure/azure-event-hubs-go/eph" + "github.com/Azure/azure-event-hubs-go/storage" + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/Azure/go-autorest/autorest/azure" "github.com/dapr/components-contrib/bindings" "github.com/dapr/dapr/pkg/logger" ) +const ( + // metadata + connectionString = "connectionString" + + // required by subscriber + consumerGroup = "consumerGroup" + storageAccountName = "storageAccountName" + storageAccountKey = "storageAccountKey" + storageContainerName = "storageContainerName" + + // errors + missingConnectionStringErrorMsg = "error: connectionString is a required attribute" + missingStorageAccountNameErrorMsg = "error: storageAccountName is a required attribute" + missingStorageAccountKeyErrorMsg = "error: storageAccountKey is a required attribute" + missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute" + missingConsumerGroupErrorMsg = "error: consumerGroup is a required attribute" +) + // AzureEventHubs allows sending/receiving Azure Event Hubs events type AzureEventHubs struct { hub *eventhub.Hub @@ -27,9 +49,11 @@ type AzureEventHubs struct { } type azureEventHubsMetadata struct { - ConnectionString string `json:"connectionString"` - ConsumerGroup string `json:"consumerGroup"` - MessageAge string `json:"messageAge"` + connectionString string + consumerGroup string + storageAccountName string + storageAccountKey string + storageContainerName string } // NewAzureEventHubs returns a new Azure Event hubs instance @@ -39,32 +63,55 @@ func NewAzureEventHubs(logger logger.Logger) *AzureEventHubs { // Init performs metadata init func (a *AzureEventHubs) Init(metadata bindings.Metadata) error { - m, err := a.parseMetadata(metadata) + m, err := parseMetadata(metadata) if err != nil { return err } a.metadata = m - hub, err := eventhub.NewHubFromConnectionString(a.metadata.ConnectionString) + hub, err := eventhub.NewHubFromConnectionString(a.metadata.connectionString) + if err != nil { - return err + return fmt.Errorf("unable to connect to azure event hubs: %v", err) } a.hub = hub return nil } -func (a *AzureEventHubs) parseMetadata(metadata bindings.Metadata) (*azureEventHubsMetadata, error) { - b, err := json.Marshal(metadata.Properties) - if err != nil { - return nil, err +func parseMetadata(meta bindings.Metadata) (*azureEventHubsMetadata, error) { + m := &azureEventHubsMetadata{} + + if val, ok := meta.Properties[connectionString]; ok && val != "" { + m.connectionString = val + } else { + return m, errors.New(missingConnectionStringErrorMsg) } - var eventHubsMetadata azureEventHubsMetadata - err = json.Unmarshal(b, &eventHubsMetadata) - if err != nil { - return nil, err + if val, ok := meta.Properties[storageAccountName]; ok && val != "" { + m.storageAccountName = val + } else { + return m, errors.New(missingStorageAccountNameErrorMsg) } - return &eventHubsMetadata, nil + + if val, ok := meta.Properties[storageAccountKey]; ok && val != "" { + m.storageAccountKey = val + } else { + return m, errors.New(missingStorageAccountKeyErrorMsg) + } + + if val, ok := meta.Properties[storageContainerName]; ok && val != "" { + m.storageContainerName = val + } else { + return m, errors.New(missingStorageContainerNameErrorMsg) + } + + if val, ok := meta.Properties[consumerGroup]; ok && val != "" { + m.consumerGroup = val + } else { + return m, errors.New(missingConsumerGroupErrorMsg) + } + + return m, nil } // Write posts an event hubs message @@ -81,52 +128,43 @@ func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error { // Read gets messages from eventhubs in a non-blocking fashion func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) error) error { - callback := func(c context.Context, event *eventhub.Event) error { - if a.metadata.MessageAge != "" && event.SystemProperties != nil && event.SystemProperties.EnqueuedTime != nil { - enqTime := *event.SystemProperties.EnqueuedTime - d, err := time.ParseDuration(a.metadata.MessageAge) - if err != nil { - a.logger.Errorf("error parsing duration: %s", err) - return nil - } else if time.Now().UTC().Sub(enqTime) > d { - return nil - } - } - if event != nil { - handler(&bindings.ReadResponse{ - Data: event.Data, - }) - } - return nil - } - - ctx := context.Background() - runtimeInfo, err := a.hub.GetRuntimeInformation(ctx) + cred, err := azblob.NewSharedKeyCredential(a.metadata.storageAccountName, a.metadata.storageAccountKey) if err != nil { return err } - ops := []eventhub.ReceiveOption{ - eventhub.ReceiveWithLatestOffset(), + leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.metadata.storageAccountName, a.metadata.storageContainerName, azure.PublicCloud) + if err != nil { + return err } - if a.metadata.ConsumerGroup != "" { - a.logger.Infof("eventhubs: using consumer group %s", a.metadata.ConsumerGroup) - ops = append(ops, eventhub.ReceiveWithConsumerGroup(a.metadata.ConsumerGroup)) + processor, err := eph.NewFromConnectionString(context.Background(), a.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(a.metadata.consumerGroup)) + + if err != nil { + return err } - for _, partitionID := range runtimeInfo.PartitionIDs { - _, err := a.hub.Receive(ctx, partitionID, callback, ops...) - if err != nil { - return err - } + _, err = processor.RegisterHandler(context.Background(), + func(c context.Context, e *eventhub.Event) error { + return handler(&bindings.ReadResponse{Data: e.Data}) + }) + if err != nil { + return err } - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) - <-signalChan + err = processor.StartNonBlocking(context.Background()) + if err != nil { + return err + } + + // close Event Hubs when application exits + + exitChan := make(chan os.Signal, 1) + signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM) + <-exitChan a.hub.Close(context.Background()) + return nil } diff --git a/bindings/azure/eventhubs/eventhubs_test.go b/bindings/azure/eventhubs/eventhubs_test.go index 13e2e69e4..0175c84e1 100644 --- a/bindings/azure/eventhubs/eventhubs_test.go +++ b/bindings/azure/eventhubs/eventhubs_test.go @@ -9,17 +9,64 @@ import ( "testing" "github.com/dapr/components-contrib/bindings" - "github.com/dapr/dapr/pkg/logger" "github.com/stretchr/testify/assert" ) func TestParseMetadata(t *testing.T) { - m := bindings.Metadata{} - m.Properties = map[string]string{"connectionString": "a", "consumerGroup": "a", "messageAge": "a"} - eh := AzureEventHubs{logger: logger.NewLogger("test")} - meta, err := eh.parseMetadata(m) - assert.Nil(t, err) - assert.Equal(t, "a", meta.ConnectionString) - assert.Equal(t, "a", meta.ConsumerGroup) - assert.Equal(t, "a", meta.MessageAge) + t.Run("test valid configuration", func(t *testing.T) { + + props := map[string]string{connectionString: "fake", consumerGroup: "mygroup", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"} + + bindingsMetadata := bindings.Metadata{Properties: props} + + m, err := parseMetadata(bindingsMetadata) + + assert.NoError(t, err) + assert.Equal(t, m.connectionString, "fake") + assert.Equal(t, m.storageAccountName, "account") + assert.Equal(t, m.storageAccountKey, "key") + assert.Equal(t, m.storageContainerName, "container") + assert.Equal(t, m.consumerGroup, "mygroup") + }) + + type invalidConfigTestCase struct { + name string + config map[string]string + errMsg string + } + invalidConfigTestCases := []invalidConfigTestCase{ + { + "missing consumerGroup", + map[string]string{connectionString: "fake", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"}, + missingConsumerGroupErrorMsg, + }, + { + "missing connectionString", + map[string]string{consumerGroup: "fake", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"}, + missingConnectionStringErrorMsg, + }, + { + "missing storageAccountName", + map[string]string{consumerGroup: "fake", connectionString: "fake", storageAccountKey: "key", storageContainerName: "container"}, + missingStorageAccountNameErrorMsg, + }, + { + "missing storageAccountKey", + map[string]string{consumerGroup: "fake", connectionString: "fake", storageAccountName: "name", storageContainerName: "container"}, + missingStorageAccountKeyErrorMsg, + }, + { + "missing storageContainerName", + map[string]string{consumerGroup: "fake", connectionString: "fake", storageAccountName: "name", storageAccountKey: "key"}, + missingStorageContainerNameErrorMsg, + }} + + for _, c := range invalidConfigTestCases { + t.Run(c.name, func(t *testing.T) { + bindingsMetadata := bindings.Metadata{Properties: c.config} + _, err := parseMetadata(bindingsMetadata) + assert.Error(t, err) + assert.Equal(t, err.Error(), c.errMsg) + }) + } }