Updated Event Hubs bindings implementation to use EPH (#253)

* update implementation to use EPH

* fixed minor lint error

Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
Abhishek Gupta 2020-03-10 10:37:52 +05:30 committed by GitHub
parent 807e8893cb
commit cf077cca65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 145 additions and 60 deletions

View File

@ -7,17 +7,39 @@ package eventhubs
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"
eventhub "github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/storage"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/dapr/pkg/logger"
)
const (
// metadata
connectionString = "connectionString"
// required by subscriber
consumerGroup = "consumerGroup"
storageAccountName = "storageAccountName"
storageAccountKey = "storageAccountKey"
storageContainerName = "storageContainerName"
// errors
missingConnectionStringErrorMsg = "error: connectionString is a required attribute"
missingStorageAccountNameErrorMsg = "error: storageAccountName is a required attribute"
missingStorageAccountKeyErrorMsg = "error: storageAccountKey is a required attribute"
missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute"
missingConsumerGroupErrorMsg = "error: consumerGroup is a required attribute"
)
// AzureEventHubs allows sending/receiving Azure Event Hubs events
type AzureEventHubs struct {
hub *eventhub.Hub
@ -27,9 +49,11 @@ type AzureEventHubs struct {
}
type azureEventHubsMetadata struct {
ConnectionString string `json:"connectionString"`
ConsumerGroup string `json:"consumerGroup"`
MessageAge string `json:"messageAge"`
connectionString string
consumerGroup string
storageAccountName string
storageAccountKey string
storageContainerName string
}
// NewAzureEventHubs returns a new Azure Event hubs instance
@ -39,32 +63,55 @@ func NewAzureEventHubs(logger logger.Logger) *AzureEventHubs {
// Init performs metadata init
func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
m, err := a.parseMetadata(metadata)
m, err := parseMetadata(metadata)
if err != nil {
return err
}
a.metadata = m
hub, err := eventhub.NewHubFromConnectionString(a.metadata.ConnectionString)
hub, err := eventhub.NewHubFromConnectionString(a.metadata.connectionString)
if err != nil {
return err
return fmt.Errorf("unable to connect to azure event hubs: %v", err)
}
a.hub = hub
return nil
}
func (a *AzureEventHubs) parseMetadata(metadata bindings.Metadata) (*azureEventHubsMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
func parseMetadata(meta bindings.Metadata) (*azureEventHubsMetadata, error) {
m := &azureEventHubsMetadata{}
if val, ok := meta.Properties[connectionString]; ok && val != "" {
m.connectionString = val
} else {
return m, errors.New(missingConnectionStringErrorMsg)
}
var eventHubsMetadata azureEventHubsMetadata
err = json.Unmarshal(b, &eventHubsMetadata)
if err != nil {
return nil, err
if val, ok := meta.Properties[storageAccountName]; ok && val != "" {
m.storageAccountName = val
} else {
return m, errors.New(missingStorageAccountNameErrorMsg)
}
return &eventHubsMetadata, nil
if val, ok := meta.Properties[storageAccountKey]; ok && val != "" {
m.storageAccountKey = val
} else {
return m, errors.New(missingStorageAccountKeyErrorMsg)
}
if val, ok := meta.Properties[storageContainerName]; ok && val != "" {
m.storageContainerName = val
} else {
return m, errors.New(missingStorageContainerNameErrorMsg)
}
if val, ok := meta.Properties[consumerGroup]; ok && val != "" {
m.consumerGroup = val
} else {
return m, errors.New(missingConsumerGroupErrorMsg)
}
return m, nil
}
// Write posts an event hubs message
@ -81,52 +128,43 @@ func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error {
// Read gets messages from eventhubs in a non-blocking fashion
func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) error) error {
callback := func(c context.Context, event *eventhub.Event) error {
if a.metadata.MessageAge != "" && event.SystemProperties != nil && event.SystemProperties.EnqueuedTime != nil {
enqTime := *event.SystemProperties.EnqueuedTime
d, err := time.ParseDuration(a.metadata.MessageAge)
if err != nil {
a.logger.Errorf("error parsing duration: %s", err)
return nil
} else if time.Now().UTC().Sub(enqTime) > d {
return nil
}
}
if event != nil {
handler(&bindings.ReadResponse{
Data: event.Data,
})
}
return nil
}
ctx := context.Background()
runtimeInfo, err := a.hub.GetRuntimeInformation(ctx)
cred, err := azblob.NewSharedKeyCredential(a.metadata.storageAccountName, a.metadata.storageAccountKey)
if err != nil {
return err
}
ops := []eventhub.ReceiveOption{
eventhub.ReceiveWithLatestOffset(),
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.metadata.storageAccountName, a.metadata.storageContainerName, azure.PublicCloud)
if err != nil {
return err
}
if a.metadata.ConsumerGroup != "" {
a.logger.Infof("eventhubs: using consumer group %s", a.metadata.ConsumerGroup)
ops = append(ops, eventhub.ReceiveWithConsumerGroup(a.metadata.ConsumerGroup))
processor, err := eph.NewFromConnectionString(context.Background(), a.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(a.metadata.consumerGroup))
if err != nil {
return err
}
for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := a.hub.Receive(ctx, partitionID, callback, ops...)
if err != nil {
return err
}
_, err = processor.RegisterHandler(context.Background(),
func(c context.Context, e *eventhub.Event) error {
return handler(&bindings.ReadResponse{Data: e.Data})
})
if err != nil {
return err
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
<-signalChan
err = processor.StartNonBlocking(context.Background())
if err != nil {
return err
}
// close Event Hubs when application exits
exitChan := make(chan os.Signal, 1)
signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
<-exitChan
a.hub.Close(context.Background())
return nil
}

View File

@ -9,17 +9,64 @@ import (
"testing"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/dapr/pkg/logger"
"github.com/stretchr/testify/assert"
)
func TestParseMetadata(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"connectionString": "a", "consumerGroup": "a", "messageAge": "a"}
eh := AzureEventHubs{logger: logger.NewLogger("test")}
meta, err := eh.parseMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.ConnectionString)
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.MessageAge)
t.Run("test valid configuration", func(t *testing.T) {
props := map[string]string{connectionString: "fake", consumerGroup: "mygroup", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"}
bindingsMetadata := bindings.Metadata{Properties: props}
m, err := parseMetadata(bindingsMetadata)
assert.NoError(t, err)
assert.Equal(t, m.connectionString, "fake")
assert.Equal(t, m.storageAccountName, "account")
assert.Equal(t, m.storageAccountKey, "key")
assert.Equal(t, m.storageContainerName, "container")
assert.Equal(t, m.consumerGroup, "mygroup")
})
type invalidConfigTestCase struct {
name string
config map[string]string
errMsg string
}
invalidConfigTestCases := []invalidConfigTestCase{
{
"missing consumerGroup",
map[string]string{connectionString: "fake", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"},
missingConsumerGroupErrorMsg,
},
{
"missing connectionString",
map[string]string{consumerGroup: "fake", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"},
missingConnectionStringErrorMsg,
},
{
"missing storageAccountName",
map[string]string{consumerGroup: "fake", connectionString: "fake", storageAccountKey: "key", storageContainerName: "container"},
missingStorageAccountNameErrorMsg,
},
{
"missing storageAccountKey",
map[string]string{consumerGroup: "fake", connectionString: "fake", storageAccountName: "name", storageContainerName: "container"},
missingStorageAccountKeyErrorMsg,
},
{
"missing storageContainerName",
map[string]string{consumerGroup: "fake", connectionString: "fake", storageAccountName: "name", storageAccountKey: "key"},
missingStorageContainerNameErrorMsg,
}}
for _, c := range invalidConfigTestCases {
t.Run(c.name, func(t *testing.T) {
bindingsMetadata := bindings.Metadata{Properties: c.config}
_, err := parseMetadata(bindingsMetadata)
assert.Error(t, err)
assert.Equal(t, err.Error(), c.errMsg)
})
}
}