Metadata parsing

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-01-10 18:51:50 +00:00
parent b2d8e6013b
commit eaa74d77d9
1 changed files with 21 additions and 26 deletions

View File

@ -15,7 +15,6 @@ package eventhubs
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
@ -33,6 +32,7 @@ import (
azauth "github.com/dapr/components-contrib/internal/authentication/azure" azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/utils" "github.com/dapr/components-contrib/internal/utils"
"github.com/dapr/components-contrib/metadata"
contribMetadata "github.com/dapr/components-contrib/metadata" contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger" "github.com/dapr/kit/logger"
@ -47,18 +47,18 @@ const (
partitionKeyMetadataKey = "partitionKey" partitionKeyMetadataKey = "partitionKey"
// errors. // errors.
hubManagerCreationErrorMsg = "error: creating eventHub manager client" hubManagerCreationErrorMsg = "failed to create eventHub manager client"
invalidConnectionStringErrorMsg = "error: connectionString is invalid" invalidConnectionStringErrorMsg = "connectionString is invalid"
missingConnectionStringNamespaceErrorMsg = "error: connectionString or eventHubNamespace is required" missingConnectionStringNamespaceErrorMsg = "connectionString or eventHubNamespace is required"
missingStorageAccountNameErrorMsg = "error: storageAccountName is a required attribute for subscribe" missingStorageAccountNameErrorMsg = "storageAccountName is a required attribute for subscribe"
missingStorageAccountKeyErrorMsg = "error: storageAccountKey is required for subscribe when connectionString is provided" missingStorageAccountKeyErrorMsg = "storageAccountKey is required for subscribe when connectionString is provided"
missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute for subscribe" missingStorageContainerNameErrorMsg = "storageContainerName is a required attribute for subscribe"
missingConsumerIDErrorMsg = "error: missing consumerID attribute for subscribe" missingConsumerIDErrorMsg = "missing consumerID attribute for subscribe"
bothConnectionStringNamespaceErrorMsg = "error: both connectionString and eventHubNamespace are given, only one should be given" bothConnectionStringNamespaceErrorMsg = "both connectionString and eventHubNamespace are given, only one should be given"
missingResourceGroupNameMsg = "error: missing resourceGroupName attribute required for entityManagement" missingResourceGroupNameMsg = "missing resourceGroupName attribute required for entityManagement"
missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement" missingSubscriptionIDMsg = "missing subscriptionID attribute required for entityManagement"
entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString" entityManagementConnectionStrMsg = "entity management support is not available with connectionString"
differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString" differentTopicConnectionStringErrorTmpl = "specified topic %s does not match the Event Hub name in the provided connectionString"
// Event Hubs SystemProperties names for metadata passthrough. // Event Hubs SystemProperties names for metadata passthrough.
sysPropSequenceNumber = "x-opt-sequence-number" sysPropSequenceNumber = "x-opt-sequence-number"
@ -85,8 +85,8 @@ const (
resourceGetTimeout time.Duration = 5 * time.Second resourceGetTimeout time.Duration = 5 * time.Second
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers. // See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
maxMessageRetention = int32(90) maxMessageRetention int32 = 90
maxPartitionCount = int32(1024) maxPartitionCount int32 = 1024
) )
func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, e *eventhub.Event, handler pubsub.Handler) error { func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, e *eventhub.Event, handler pubsub.Handler) error {
@ -177,23 +177,18 @@ func NewAzureEventHubs(logger logger.Logger) pubsub.PubSub {
} }
func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, error) { func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, error) {
b, err := json.Marshal(meta.Properties) var m azureEventHubsMetadata
err := metadata.DecodeMetadata(meta.Properties, &m)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to decode metada: %w", err)
}
m := azureEventHubsMetadata{}
err = json.Unmarshal(b, &m)
if err != nil {
return nil, err
} }
if m.ConnectionString == "" && m.EventHubNamespace == "" { if m.ConnectionString == "" && m.EventHubNamespace == "" {
return &m, errors.New(missingConnectionStringNamespaceErrorMsg) return nil, errors.New(missingConnectionStringNamespaceErrorMsg)
} }
if m.ConnectionString != "" && m.EventHubNamespace != "" { if m.ConnectionString != "" && m.EventHubNamespace != "" {
return &m, errors.New(bothConnectionStringNamespaceErrorMsg) return nil, errors.New(bothConnectionStringNamespaceErrorMsg)
} }
return &m, nil return &m, nil
@ -209,7 +204,7 @@ func validateAndGetHubName(connectionString string) (string, error) {
func (aeh *AzureEventHubs) ensureEventHub(ctx context.Context, hubName string) error { func (aeh *AzureEventHubs) ensureEventHub(ctx context.Context, hubName string) error {
if aeh.hubManager == nil { if aeh.hubManager == nil {
aeh.logger.Errorf("hubManager client not initialized properly.") aeh.logger.Errorf("hubManager client not initialized properly")
return fmt.Errorf("hubManager client not initialized properly") return fmt.Errorf("hubManager client not initialized properly")
} }
entity, err := aeh.getHubEntity(ctx, hubName) entity, err := aeh.getHubEntity(ctx, hubName)