components-contrib/common/component/azure/eventhubs/metadata.go

144 lines
5.9 KiB
Go

/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs
import (
"errors"
"fmt"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
azauth "github.com/dapr/components-contrib/common/authentication/azure"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/metadata"
)
type AzureEventHubsMetadata struct {
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`
EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"`
ConsumerID string `json:"consumerID" mapstructure:"consumerID"`
StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"`
StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"`
StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"`
StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"`
EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"`
MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"`
PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"`
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
// Binding only
EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"`
ConsumerGroup string `json:"consumerGroup" mapstructure:"consumerGroup" mdonly:"bindings"` // Alias for ConsumerID
// Internal properties
namespaceName string
hubName string
azEnvSettings azauth.EnvironmentSettings
properties map[string]string
}
func parseEventHubsMetadata(meta map[string]string, isBinding bool, log logger.Logger) (*AzureEventHubsMetadata, error) {
var m AzureEventHubsMetadata
err := metadata.DecodeMetadata(meta, &m)
if err != nil {
return nil, fmt.Errorf("failed to decode metada: %w", err)
}
// Store the raw properties in the object
m.properties = meta
// One and only one of connectionString and eventHubNamespace is required
if m.ConnectionString == "" && m.EventHubNamespace == "" {
return nil, errors.New("one of connectionString or eventHubNamespace is required")
}
if m.ConnectionString != "" && m.EventHubNamespace != "" {
return nil, errors.New("only one of connectionString or eventHubNamespace should be passed")
}
// ConsumerGroup is an alias for ConsumerID
if m.ConsumerID != "" && m.ConsumerGroup == "" {
m.ConsumerGroup = m.ConsumerID
}
// For the binding, we need to have a property "eventHub" which is the topic name unless it's included in the connection string
if isBinding {
if m.ConnectionString == "" {
if m.EventHub == "" {
return nil, errors.New("property 'eventHub' is required when connecting with Azure AD")
}
m.hubName = m.EventHub
} else {
hubName := hubNameFromConnString(m.ConnectionString)
if hubName != "" {
m.hubName = hubName
} else if m.EventHub != "" {
m.hubName = m.EventHub
} else {
return nil, errors.New("the provided connection string does not contain a value for 'EntityPath' and no 'eventHub' property was passed")
}
}
} else {
// Ignored when not a binding
m.EventHub = ""
// If connecting using a connection string, parse hubName
if m.ConnectionString != "" {
hubName := hubNameFromConnString(m.ConnectionString)
if hubName != "" {
log.Infof(`The provided connection string is specific to the Event Hub ("entity path") '%s'; publishing or subscribing to a topic that does not match this Event Hub will fail when attempted`, hubName)
} else {
log.Info(`The provided connection string does not contain an Event Hub name ("entity path"); the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored`)
}
m.hubName = hubName
}
}
// If both storageConnectionString and storageAccountName are specified, show a warning because the connection string will take priority
if m.StorageConnectionString != "" && m.StorageAccountName != "" {
log.Warn("Property storageAccountName is ignored when storageConnectionString is present")
}
// Entity management is only possible when using Azure AD
if m.EnableEntityManagement && m.ConnectionString != "" {
m.EnableEntityManagement = false
log.Warn("Entity management support is not available when connecting with a connection string")
}
if m.EventHubNamespace != "" {
// Older versions of Dapr required the namespace name to be just the name and not a FQDN
// Automatically append ".servicebus.windows.net" to make them a FQDN if not present, but show a log
if !strings.ContainsRune(m.EventHubNamespace, '.') {
m.EventHubNamespace += ".servicebus.windows.net"
log.Info("Property eventHubNamespace is not a FQDN; the suffix '.servicebus.windows.net' will be added automatically")
}
// The namespace name is the first part of the FQDN, until the first dot
m.namespaceName = m.EventHubNamespace[0:strings.IndexRune(m.EventHubNamespace, '.')]
}
return &m, nil
}
// Returns the hub name (topic) from the connection string.
func hubNameFromConnString(connString string) string {
props, err := azeventhubs.ParseConnectionString(connString)
if err != nil || props.EntityPath == nil {
return ""
}
return *props.EntityPath
}