binding.eventhub: use same name schema for storage leases like on pubsub.eventhub (#1940)

* binding.eventhub: use same name schema for storage leases like on pubsub.eventhub

Signed-off-by: Christian Leinweber <christian.leinweber@maibornwolff.de>

* binding.eventhub: rename AzureEventshub local var

Signed-off-by: Christian Leinweber <christian.leinweber@maibornwolff.de>
This commit is contained in:
Christian Leinweber 2022-08-08 18:53:04 +02:00 committed by GitHub
parent 66d92cd7ff
commit 1148776d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 1 deletions

View File

@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/Azure/azure-amqp-common-go/v3/aad"
@ -154,6 +155,28 @@ func validate(connectionString string) error {
return err
}
func (a *AzureEventHubs) getStoragePrefixString() (string, error) {
hubName, err := a.validateAndGetHubName()
if err != nil {
return "", err
}
// empty string in the end of slice to have a suffix "-".
return strings.Join([]string{"dapr", hubName, a.metadata.consumerGroup, ""}, "-"), nil
}
func (a *AzureEventHubs) validateAndGetHubName() (string, error) {
hubName := a.metadata.eventHubName
if hubName == "" {
parsed, err := conn.ParsedConnectionFromStr(a.metadata.connectionString)
if err != nil {
return "", err
}
hubName = parsed.HubName
}
return hubName, nil
}
// Init performs metadata init.
func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
m, err := parseMetadata(metadata)
@ -360,7 +383,13 @@ func (a *AzureEventHubs) RegisterPartitionedEventProcessor(ctx context.Context,
// RegisterEventProcessor - receive eventhub messages by eventprocessor
// host by balancing partitions.
func (a *AzureEventHubs) RegisterEventProcessor(ctx context.Context, handler bindings.Handler) error {
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment)
storagePrefix, err := a.getStoragePrefixString()
if err != nil {
return err
}
leaserPrefixOpt := storage.WithPrefixInBlobPath(storagePrefix)
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment, leaserPrefixOpt)
if err != nil {
return err
}

View File

@ -17,10 +17,45 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
var testLogger = logger.NewLogger("test")
func TestGetStoragePrefixString(t *testing.T) {
props := map[string]string{"storageAccountName": "fake", "storageAccountKey": "fake", "consumerGroup": "default", "storageContainerName": "test", "eventHub": "hubName", "eventHubNamespace": "fake"}
metadata := bindings.Metadata{Properties: props}
m, err := parseMetadata(metadata)
require.NoError(t, err)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
actual, _ := aeh.getStoragePrefixString()
assert.Equal(t, "dapr-hubName-default-", actual)
}
func TestGetStoragePrefixStringWithHubNameFromConnectionString(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=hubName"
props := map[string]string{"storageAccountName": "fake", "storageAccountKey": "fake", "consumerGroup": "default", "storageContainerName": "test", "connectionString": connectionString}
metadata := bindings.Metadata{Properties: props}
m, err := parseMetadata(metadata)
require.NoError(t, err)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
actual, _ := aeh.getStoragePrefixString()
assert.Equal(t, "dapr-hubName-default-", actual)
}
func TestParseMetadata(t *testing.T) {
t.Run("test valid configuration", func(t *testing.T) {
props := map[string]string{connectionString: "fake", consumerGroup: "mygroup", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"}