Added entity management

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-01-20 00:18:22 +00:00
parent 4c12f40534
commit c10173b29f
6 changed files with 294 additions and 265 deletions

1
go.mod
View File

@ -144,6 +144,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect

1
go.sum
View File

@ -437,6 +437,7 @@ github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0/go.mod h1:5do
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.4 h1:kaZamwZwmUqnECvnPkf1LBRBIFYYCy3E0gKHn/UFSD0=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.4/go.mod h1:uDLwkzCJMvTrHsvtiVFeAp85hi3W77zvs61wrpc+6ho=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 h1:BWeAAEzkCnL0ABVJqs+4mYudNch7oFGPtTlSmIWL8ms=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1 h1:YvQv9Mz6T8oR5ypQOL6erY0Z5t71ak1uHV4QFokCOZk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=

View File

@ -0,0 +1,214 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
"github.com/dapr/kit/retry"
)
const (
defaultMessageRetentionInDays = 1
defaultPartitionCount = 1
resourceCheckMaxRetry = 5
resourceCheckMaxRetryInterval = 5 * time.Minute
resourceCreationTimeout = 15 * time.Second
resourceGetTimeout = 5 * time.Second
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
maxMessageRetention = int32(90)
maxPartitionCount = int32(1024)
)
// Intializes the entity management capabilities. This method is invoked by Init.
func (aeh *AzureEventHubs) initEntityManagement(md map[string]string) error {
// Validate the metadata
err := aeh.validateEnitityManagementMetadata()
if err != nil {
return err
}
// Get Azure Management plane credentials object
settings, err := azauth.NewEnvironmentSettings("azure", md)
if err != nil {
return err
}
creds, err := settings.GetTokenCredential()
if err != nil {
return fmt.Errorf("failed to obtain Azure AD management credentials: %w", err)
}
aeh.managementCreds = creds
return nil
}
func (aeh *AzureEventHubs) validateEnitityManagementMetadata() error {
if aeh.metadata.MessageRetentionInDays <= 0 || aeh.metadata.MessageRetentionInDays > maxMessageRetention {
aeh.logger.Warnf("Property messageRetentionInDays for entity management has an empty or invalid value; using the default value %d", defaultMessageRetentionInDays)
aeh.metadata.MessageRetentionInDays = defaultMessageRetentionInDays
}
if aeh.metadata.PartitionCount <= 0 || aeh.metadata.PartitionCount > maxPartitionCount {
aeh.logger.Warnf("Property partitionCount for entity management has an empty or invalid value; using the default value %d", defaultPartitionCount)
aeh.metadata.PartitionCount = defaultPartitionCount
}
if aeh.metadata.ResourceGroupName == "" {
return errors.New("property resourceGroupName is required for entity management")
}
if aeh.metadata.SubscriptionID == "" {
return errors.New("property subscriptionID is required for entity management")
}
return nil
}
// Ensures that the Event Hub entity exists.
// This is used during the creation of both producers and consumers.
func (aeh *AzureEventHubs) ensureEventHubEntity(parentCtx context.Context, topic string) error {
client, err := armeventhub.NewEventHubsClient(aeh.metadata.SubscriptionID, aeh.managementCreds, &arm.ClientOptions{
ClientOptions: policy.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
})
if err != nil {
return fmt.Errorf("failed to create Event Hubs ARM client: %w", err)
}
aeh.logger.Debugf("Checking if entity %s exists on Event Hub namespace %s", topic, aeh.metadata.EventHubNamespace)
// Check if the entity exists
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
defer cancel()
_, err = client.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, topic, nil)
if err == nil {
// If there's no error, the entity already exists, so all good
aeh.logger.Debugf("Entity %s already exists on Event Hub namespace %s", topic, aeh.metadata.EventHubNamespace)
return nil
}
// Check if the error is NotFound or something else
resErr := &azcore.ResponseError{}
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
// We have another error, just return it
return fmt.Errorf("failed to retrieve Event Hub entity %s: %w", topic, err)
}
// Create the entity
aeh.logger.Infof("Will create entity %s on Event Hub namespace %s", topic, aeh.metadata.EventHubNamespace)
params := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
MessageRetentionInDays: ptr.Of(int64(aeh.metadata.MessageRetentionInDays)),
PartitionCount: ptr.Of(int64(aeh.metadata.PartitionCount)),
},
}
ctx, cancel = context.WithTimeout(parentCtx, resourceCreationTimeout)
defer cancel()
_, err = client.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, topic, params, nil)
if err != nil {
return fmt.Errorf("failed to create Event Hub entity %s: %w", topic, err)
}
aeh.logger.Infof("Entity %s created on Event Hub namespace %s", topic, aeh.metadata.EventHubNamespace)
return nil
}
// Ensures that the subscription (consumer group) exists.
// This is used during the creation of consumers only.
func (aeh *AzureEventHubs) ensureSubscription(parentCtx context.Context, hubName string) error {
client, err := armeventhub.NewConsumerGroupsClient(aeh.metadata.SubscriptionID, aeh.managementCreds, &arm.ClientOptions{
ClientOptions: policy.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
})
if err != nil {
return fmt.Errorf("failed to create consumer group ARM client: %w", err)
}
aeh.logger.Debugf("Checking if consumer group %s exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
// Check if the consumer group exists
// We need to use a retry logic here
backOffConfig := retry.DefaultConfig()
backOffConfig.Policy = retry.PolicyExponential
backOffConfig.MaxInterval = resourceCheckMaxRetryInterval
backOffConfig.MaxRetries = resourceCheckMaxRetry
b := backOffConfig.NewBackOffWithContext(parentCtx)
create, err := retry.NotifyRecoverWithData(func() (bool, error) {
c, err := aeh.shouldCreateConsumerGroup(parentCtx, client, hubName)
if err != nil {
return false, err
}
return c, nil
}, b, func(_ error, _ time.Duration) {
aeh.logger.Errorf("Error checking for consumer group for Event Hub: %s. Retrying…", hubName)
}, func() {
aeh.logger.Warnf("Successfully checked for consumer group in Event Hub %s after it previously failed", hubName)
})
if err != nil {
return err
}
if !create {
// Already exists
aeh.logger.Debugf("Consumer group %s already exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
return nil
}
// Need to create the consumer group
aeh.logger.Infof("Will create consumer group %s exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
defer cancel()
_, err = client.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup, armeventhub.ConsumerGroup{}, nil)
if err != nil {
return fmt.Errorf("failed to create consumer group %s: %w", aeh.metadata.ConsumerGroup, err)
}
aeh.logger.Infof("Consumer group %s created in entity %s", aeh.metadata.ConsumerGroup, hubName)
return nil
}
func (aeh *AzureEventHubs) shouldCreateConsumerGroup(parentCtx context.Context, client *armeventhub.ConsumerGroupsClient, hubName string) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
defer cancel()
_, err := client.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup, nil)
if err == nil {
// If there's no error, the consumer group already exists, so all good
return true, nil
}
// Check if the error is NotFound or something else
resErr := &azcore.ResponseError{}
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
// We have another error, just return it
return false, err
}
// Consumer group doesn't exist
return false, nil
}

View File

@ -18,8 +18,8 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
@ -32,16 +32,6 @@ import (
"github.com/dapr/kit/logger"
)
const (
defaultMessageRetentionInDays = 1
defaultPartitionCount = 1
resourceCheckMaxRetry = 5
resourceCheckMaxRetryInterval = 5 * time.Minute
resourceCreationTimeout = 15 * time.Second
resourceGetTimeout = 5 * time.Second
)
// AzureEventHubs allows sending/receiving Azure Event Hubs events.
type AzureEventHubs struct {
metadata *azureEventHubsMetadata
@ -51,6 +41,8 @@ type AzureEventHubs struct {
producers map[string]*azeventhubs.ProducerClient
checkpointStoreCache azeventhubs.CheckpointStore
checkpointStoreLock *sync.RWMutex
managementCreds azcore.TokenCredential
}
// NewAzureEventHubs returns a new Azure Event hubs instance.
@ -100,23 +92,12 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
aeh.logger.Info("connecting to Azure Event Hub using Azure AD; the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored")
/*if aeh.metadata.EnableEntityManagement {
if err := aeh.validateEnitityManagementMetadata(); err != nil {
return err
}
// Create hubManager for eventHub management with AAD.
if managerCreateErr := aeh.createHubManager(); managerCreateErr != nil {
return managerCreateErr
}
// Get Azure Management plane settings for creating consumer groups using event hubs management client.
settings, err := azauth.NewEnvironmentSettings("azure", metadata.Properties)
if aeh.metadata.EnableEntityManagement {
err = aeh.initEntityManagement(metadata.Properties)
if err != nil {
return err
return fmt.Errorf("failed to initialize entity manager: %w", err)
}
aeh.managementSettings = settings
}*/
}
}
return nil
@ -338,7 +319,10 @@ func (aeh *AzureEventHubs) getProducerClientForTopic(ctx context.Context, topic
// Create a new entity if needed
if aeh.metadata.EnableEntityManagement {
// TODO: Create a new entity
err = aeh.ensureEventHubEntity(ctx, topic)
if err != nil {
return nil, fmt.Errorf("failed to create Event Hub entity %s: %w", topic, err)
}
}
clientOpts := &azeventhubs.ProducerClientOptions{
@ -379,7 +363,22 @@ func (aeh *AzureEventHubs) getProcessorForTopic(ctx context.Context, topic strin
// Create a new entity if needed
if aeh.metadata.EnableEntityManagement {
// TODO: Create a new entity
// First ensure that the Event Hub entity exists
// We need to acquire a lock on producers, as creating a producer can perform the same operations
aeh.producersLock.Lock()
err = aeh.ensureEventHubEntity(ctx, topic)
aeh.producersLock.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to create Event Hub entity %s: %w", topic, err)
}
// Abuse on the lock on checkpoints which are used by all tasks creating processors
aeh.checkpointStoreLock.Lock()
err = aeh.ensureSubscription(ctx, topic)
aeh.checkpointStoreLock.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to create Event Hub subscription to entity %s: %w", topic, err)
}
}
// Create a consumer client

View File

@ -203,7 +203,6 @@ func validateAndGetHubName(connectionString string) (string, error) {
func (aeh *AzureEventHubs) ensureEventHub(ctx context.Context, hubName string) error {
if aeh.hubManager == nil {
aeh.logger.Errorf("hubManager client not initialized properly")
return fmt.Errorf("hubManager client not initialized properly")
}
entity, err := aeh.getHubEntity(ctx, hubName)

View File

@ -14,7 +14,6 @@ limitations under the License.
package eventhubs
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -32,9 +31,9 @@ func TestParseEventHubsMetadata(t *testing.T) {
props := map[string]string{"connectionString": "fake"}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
m, err := parseEventHubsMetadata(metadata)
m, err := parseEventHubsMetadata(metadata, testLogger)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "fake", m.ConnectionString)
})
@ -42,240 +41,33 @@ func TestParseEventHubsMetadata(t *testing.T) {
props := map[string]string{"eventHubNamespace": "fake"}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
m, err := parseEventHubsMetadata(metadata)
m, err := parseEventHubsMetadata(metadata, testLogger)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "fake", m.EventHubNamespace)
})
t.Run("test both connectionString and eventHubNamespace given", func(t *testing.T) {
props := map[string]string{"connectionString": "fake", "eventHubNamespace": "fake"}
props := map[string]string{
"connectionString": "fake",
"eventHubNamespace": "fake",
}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
_, err := parseEventHubsMetadata(metadata)
_, err := parseEventHubsMetadata(metadata, testLogger)
assert.Error(t, err)
assert.Equal(t, bothConnectionStringNamespaceErrorMsg, err.Error())
require.Error(t, err)
assert.ErrorContains(t, err, "only one of connectionString or eventHubNamespace should be passed")
})
t.Run("test missing metadata", func(t *testing.T) {
props := map[string]string{}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
_, err := parseEventHubsMetadata(metadata)
_, err := parseEventHubsMetadata(metadata, testLogger)
assert.Error(t, err)
assert.Equal(t, missingConnectionStringNamespaceErrorMsg, err.Error())
})
}
func TestValidateSubscriptionAttributes(t *testing.T) {
t.Run("test valid configuration", func(t *testing.T) {
props := map[string]string{"connectionString": "fake", "consumerID": "fake", "storageAccountName": "account", "storageAccountKey": "key", "storageContainerName": "container"}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
m, err := parseEventHubsMetadata(metadata)
assert.NoError(t, err)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
assert.Equal(t, m.ConnectionString, "fake")
assert.Equal(t, m.StorageAccountName, "account")
assert.Equal(t, m.StorageAccountKey, "key")
assert.Equal(t, m.StorageContainerName, "container")
assert.Equal(t, m.ConsumerGroup, "fake")
err = aeh.validateSubscriptionAttributes()
assert.NoError(t, err)
})
type invalidConfigTestCase struct {
name string
config map[string]string
errMsg string
}
invalidConfigTestCases := []invalidConfigTestCase{
{
"missing consumerID",
map[string]string{"connectionString": "fake", "storageAccountName": "account", "storageAccountKey": "key", "storageContainerName": "container"},
missingConsumerIDErrorMsg,
},
{
"missing storageAccountName",
map[string]string{"consumerID": "fake", "connectionString": "fake", "storageAccountKey": "key", "storageContainerName": "container"},
missingStorageAccountNameErrorMsg,
},
{
"missing storageAccountKey",
map[string]string{"consumerID": "fake", "connectionString": "fake", "storageAccountName": "name", "storageContainerName": "container"},
missingStorageAccountKeyErrorMsg,
},
{
"missing storageContainerName",
map[string]string{"consumerID": "fake", "connectionString": "fake", "storageAccountName": "name", "storageAccountKey": "key"},
missingStorageContainerNameErrorMsg,
},
}
for _, c := range invalidConfigTestCases {
t.Run(c.name, func(t *testing.T) {
metadata := pubsub.Metadata{Base: metadata.Base{Properties: c.config}}
m, err := parseEventHubsMetadata(metadata)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
require.NoError(t, err)
err = aeh.validateSubscriptionAttributes()
assert.Error(t, err)
assert.Equal(t, c.errMsg, err.Error())
})
}
}
func TestValidateEnitityManagementMetadata(t *testing.T) {
t.Run("test valid configuration", func(t *testing.T) {
props := map[string]string{"eventHubNamespace": "fake", "messageRetentionInDays": "2", "partitionCount": "3", "resourceGroupName": "rg", "subscriptionID": "id"}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
m, err := parseEventHubsMetadata(metadata)
require.NoError(t, err)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
assert.Equal(t, "fake", m.EventHubNamespace)
assert.Equal(t, int32(2), m.MessageRetentionInDays)
assert.Equal(t, int32(3), m.PartitionCount)
err = aeh.validateEnitityManagementMetadata()
assert.NoError(t, err)
assert.Equal(t, int32(2), m.MessageRetentionInDays)
assert.Equal(t, int32(3), m.PartitionCount)
assert.Equal(t, "rg", m.ResourceGroupName)
assert.Equal(t, "id", m.SubscriptionID)
})
t.Run("test valid configuration", func(t *testing.T) {
props := map[string]string{"eventHubNamespace": "fake", "resourceGroupName": "rg", "subscriptionID": "id"}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
m, err := parseEventHubsMetadata(metadata)
require.NoError(t, err)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
assert.Equal(t, "fake", m.EventHubNamespace)
assert.Equal(t, int32(0), m.MessageRetentionInDays)
assert.Equal(t, int32(0), m.PartitionCount)
err = aeh.validateEnitityManagementMetadata()
assert.NoError(t, err)
assert.Equal(t, int32(1), m.MessageRetentionInDays)
assert.Equal(t, int32(1), m.PartitionCount)
assert.Equal(t, "rg", m.ResourceGroupName)
assert.Equal(t, "id", m.SubscriptionID)
})
type invalidConfigTestCase struct {
name string
config map[string]string
messageRetentionInDays int32
partitionCount int32
errMsg string
}
invalidConfigTestCases := []invalidConfigTestCase{
{
"negative message rentention days",
map[string]string{"eventHubNamespace": "fake", "messageRetentionInDays": "-2", "resourceGroupName": "rg", "subscriptionID": "id"},
defaultMessageRetentionInDays,
defaultPartitionCount,
"",
},
{
"more than max message rentention days",
map[string]string{"eventHubNamespace": "fake", "messageRetentionInDays": "91", "resourceGroupName": "rg", "subscriptionID": "id"},
defaultMessageRetentionInDays,
defaultPartitionCount,
"",
},
{
"negative partition count",
map[string]string{"eventHubNamespace": "fake", "partitionCount": "-2", "resourceGroupName": "rg", "subscriptionID": "id"},
defaultMessageRetentionInDays,
defaultPartitionCount,
"",
},
{
"more than max partition count",
map[string]string{"eventHubNamespace": "fake", "partitionCount": "1030", "resourceGroupName": "rg", "subscriptionID": "id"},
defaultMessageRetentionInDays,
defaultPartitionCount,
"",
},
{
"missingResourceGroupName",
map[string]string{"eventHubNamespace": "fake", "subscriptionID": "id"},
defaultMessageRetentionInDays,
defaultPartitionCount,
missingResourceGroupNameMsg,
},
{
"missingSubscriptionID",
map[string]string{"eventHubNamespace": "fake", "resourceGroupName": "id"},
defaultMessageRetentionInDays,
defaultPartitionCount,
missingSubscriptionIDMsg,
},
}
for _, c := range invalidConfigTestCases {
t.Run(c.name, func(t *testing.T) {
metadata := pubsub.Metadata{Base: metadata.Base{Properties: c.config}}
m, err := parseEventHubsMetadata(metadata)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
require.NoError(t, err)
err = aeh.validateEnitityManagementMetadata()
if c.errMsg != "" {
assert.Error(t, err)
assert.Equal(t, c.errMsg, err.Error())
} else {
assert.NoError(t, err)
}
assert.Equal(t, c.messageRetentionInDays, aeh.metadata.MessageRetentionInDays)
assert.Equal(t, c.partitionCount, aeh.metadata.PartitionCount)
})
}
}
func TestGetStoragePrefixString(t *testing.T) {
props := map[string]string{"connectionString": "fake", "consumerID": "test"}
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
m, err := parseEventHubsMetadata(metadata)
require.NoError(t, err)
aeh := &AzureEventHubs{logger: testLogger, metadata: m}
actual := aeh.getStoragePrefixString("topic")
assert.Equal(t, "dapr-topic-test-", actual)
}
func TestValidateAndGetHubName(t *testing.T) {
t.Run("valid connectionString with hub name", func(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=testHub"
h, err := validateAndGetHubName(connectionString)
assert.NoError(t, err)
assert.Equal(t, "testHub", h)
})
t.Run("valid connectionString without hub name", func(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key"
h, err := validateAndGetHubName(connectionString)
assert.NoError(t, err)
assert.Empty(t, h)
})
t.Run("invalid connectionString ", func(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;ShareKeyName=fakeKey;SharedAccessKey=key"
_, err := validateAndGetHubName(connectionString)
assert.Error(t, err)
require.Error(t, err)
assert.ErrorContains(t, err, "one of connectionString or eventHubNamespace is required")
})
}
@ -284,41 +76,64 @@ func TestConstructConnectionStringFromTopic(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key"
topic := "testHub"
aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}}
metadata := pubsub.Metadata{Base: metadata.Base{
Properties: map[string]string{
"connectionString": connectionString,
},
}}
aeh := &AzureEventHubs{logger: testLogger}
err := aeh.Init(metadata)
require.NoError(t, err)
c, err := aeh.constructConnectionStringFromTopic(topic)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, connectionString+";EntityPath=testHub", c)
})
t.Run("valid connectionString with hub name", func(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=testHub"
topic := "testHub"
aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}}
metadata := pubsub.Metadata{Base: metadata.Base{
Properties: map[string]string{
"connectionString": connectionString,
},
}}
aeh := &AzureEventHubs{logger: testLogger}
err := aeh.Init(metadata)
require.NoError(t, err)
c, err := aeh.constructConnectionStringFromTopic(topic)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, connectionString, c)
})
t.Run("invalid connectionString with hub name", func(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;ShareKey=key;EntityPath=testHub"
topic := "testHub"
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;NoKey=key;EntityPath=testHub"
aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}}
c, err := aeh.constructConnectionStringFromTopic(topic)
assert.Error(t, err)
assert.Equal(t, "", c)
metadata := pubsub.Metadata{Base: metadata.Base{
Properties: map[string]string{
"connectionString": connectionString,
},
}}
aeh := &AzureEventHubs{logger: testLogger}
err := aeh.Init(metadata)
require.Error(t, err)
})
t.Run("valid connectionString with different hub name", func(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=testHub"
topic := "differentHub"
aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}}
metadata := pubsub.Metadata{Base: metadata.Base{
Properties: map[string]string{
"connectionString": connectionString,
},
}}
aeh := &AzureEventHubs{logger: testLogger}
err := aeh.Init(metadata)
require.NoError(t, err)
c, err := aeh.constructConnectionStringFromTopic(topic)
assert.Error(t, err)
assert.Equal(t, (fmt.Sprintf(differentTopicConnectionStringErrorTmpl, topic)), err.Error())
require.Error(t, err)
assert.ErrorContains(t, err, "does not match the Event Hub name in the connection string")
assert.Equal(t, "", c)
})
}