From eaa74d77d9747b4ff02aed87dcfe8d149edab5a0 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Tue, 10 Jan 2023 18:51:50 +0000 Subject: [PATCH] Metadata parsing Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pubsub/azure/eventhubs/eventhubs.go | 47 +++++++++++++---------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index 512ff61a9..83494cd00 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -15,7 +15,6 @@ package eventhubs import ( "context" - "encoding/json" "errors" "fmt" "strconv" @@ -33,6 +32,7 @@ import ( azauth "github.com/dapr/components-contrib/internal/authentication/azure" "github.com/dapr/components-contrib/internal/utils" + "github.com/dapr/components-contrib/metadata" contribMetadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -47,18 +47,18 @@ const ( partitionKeyMetadataKey = "partitionKey" // errors. - hubManagerCreationErrorMsg = "error: creating eventHub manager client" - invalidConnectionStringErrorMsg = "error: connectionString is invalid" - missingConnectionStringNamespaceErrorMsg = "error: connectionString or eventHubNamespace is required" - missingStorageAccountNameErrorMsg = "error: storageAccountName is a required attribute for subscribe" - missingStorageAccountKeyErrorMsg = "error: storageAccountKey is required for subscribe when connectionString is provided" - missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute for subscribe" - missingConsumerIDErrorMsg = "error: missing consumerID attribute for subscribe" - bothConnectionStringNamespaceErrorMsg = "error: both connectionString and eventHubNamespace are given, only one should be given" - missingResourceGroupNameMsg = "error: missing resourceGroupName attribute required for entityManagement" - missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement" - entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString" - differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString" + hubManagerCreationErrorMsg = "failed to create eventHub manager client" + invalidConnectionStringErrorMsg = "connectionString is invalid" + missingConnectionStringNamespaceErrorMsg = "connectionString or eventHubNamespace is required" + missingStorageAccountNameErrorMsg = "storageAccountName is a required attribute for subscribe" + missingStorageAccountKeyErrorMsg = "storageAccountKey is required for subscribe when connectionString is provided" + missingStorageContainerNameErrorMsg = "storageContainerName is a required attribute for subscribe" + missingConsumerIDErrorMsg = "missing consumerID attribute for subscribe" + bothConnectionStringNamespaceErrorMsg = "both connectionString and eventHubNamespace are given, only one should be given" + missingResourceGroupNameMsg = "missing resourceGroupName attribute required for entityManagement" + missingSubscriptionIDMsg = "missing subscriptionID attribute required for entityManagement" + entityManagementConnectionStrMsg = "entity management support is not available with connectionString" + differentTopicConnectionStringErrorTmpl = "specified topic %s does not match the Event Hub name in the provided connectionString" // Event Hubs SystemProperties names for metadata passthrough. sysPropSequenceNumber = "x-opt-sequence-number" @@ -85,8 +85,8 @@ const ( resourceGetTimeout time.Duration = 5 * time.Second // See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers. - maxMessageRetention = int32(90) - maxPartitionCount = int32(1024) + maxMessageRetention int32 = 90 + maxPartitionCount int32 = 1024 ) func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, e *eventhub.Event, handler pubsub.Handler) error { @@ -177,23 +177,18 @@ func NewAzureEventHubs(logger logger.Logger) pubsub.PubSub { } func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, error) { - b, err := json.Marshal(meta.Properties) + var m azureEventHubsMetadata + err := metadata.DecodeMetadata(meta.Properties, &m) if err != nil { - return nil, err - } - - m := azureEventHubsMetadata{} - err = json.Unmarshal(b, &m) - if err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode metada: %w", err) } if m.ConnectionString == "" && m.EventHubNamespace == "" { - return &m, errors.New(missingConnectionStringNamespaceErrorMsg) + return nil, errors.New(missingConnectionStringNamespaceErrorMsg) } if m.ConnectionString != "" && m.EventHubNamespace != "" { - return &m, errors.New(bothConnectionStringNamespaceErrorMsg) + return nil, errors.New(bothConnectionStringNamespaceErrorMsg) } return &m, nil @@ -209,7 +204,7 @@ func validateAndGetHubName(connectionString string) (string, error) { func (aeh *AzureEventHubs) ensureEventHub(ctx context.Context, hubName string) error { if aeh.hubManager == nil { - aeh.logger.Errorf("hubManager client not initialized properly.") + aeh.logger.Errorf("hubManager client not initialized properly") return fmt.Errorf("hubManager client not initialized properly") } entity, err := aeh.getHubEntity(ctx, hubName)