Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-01-19 00:12:10 +00:00
parent 9ef0bb9441
commit 0157bf0b72
30 changed files with 1090 additions and 704 deletions

3
go.mod
View File

@ -17,6 +17,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1
github.com/Azure/azure-storage-blob-go v0.15.0
@ -140,7 +141,7 @@ require (
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 // indirect

9
go.sum
View File

@ -317,14 +317,17 @@ github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4c
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0 h1:82w8tzLcOwDP/Q35j/wEBPt0n0kVC3cjtPdD62G8UAk=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0/go.mod h1:S78i9yTr4o/nXlH76bKjGUye9Z2wSxO5Tz7GoDr4vfI=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0 h1:Lg6BW0VPmCwcMlvOviL3ruHFO+H9tZNqscK0AeuFjGM=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0/go.mod h1:9V2j0jn9jDEkCkv8w/bKTNppX/d0FVA1ud77xCIP4KA=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0 h1:X/ePaAG8guM7j5WORT5eEIw7cGUxe9Ah1jEQJKLmmSo=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0/go.mod h1:5dog28UP3dd1BnCPFYvyHfsmA+Phmoezt+KWT5cZnyc=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 h1:27HVgIcvrKkRs5eJzHnyZdt71/EyB3clkiJQB0qyIa8=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3/go.mod h1:Eo6WMP/iw9sp06+v8y030eReUwX6sULn5i3fxCDWPag=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 h1:BWeAAEzkCnL0ABVJqs+4mYudNch7oFGPtTlSmIWL8ms=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1 h1:YvQv9Mz6T8oR5ypQOL6erY0Z5t71ak1uHV4QFokCOZk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
@ -1136,8 +1139,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=

View File

@ -66,7 +66,7 @@ func NewClient(metadata *Metadata, rawMetadata map[string]string) (*Client, erro
}
}
} else {
settings, err := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, rawMetadata)
settings, err := azauth.NewEnvironmentSettings("servicebus", rawMetadata)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,8 @@
# conn
This package is copied from the Azure SDK as it is an internal package in the upstream SDK.
We can remove this once [Azure/azure-sdk-for-go#19840](https://github.com/Azure/azure-sdk-for-go/issues/19840) is fixed.
- Source: https://github.com/Azure/azure-sdk-for-go/tree/sdk/messaging/azeventhubs/v0.4.0/sdk/messaging/azeventhubs/internal/conn
- License: Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

View File

@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package conn
import (
"errors"
"fmt"
"net/url"
"strings"
)
type (
// ParsedConn is the structure of a parsed Service Bus or Event Hub connection string.
ParsedConn struct {
Namespace string
HubName string
KeyName string
Key string
SAS string
}
)
// ParsedConnectionFromStr takes a string connection string from the Azure portal and returns the parsed representation.
// The method will return an error if the Endpoint, SharedAccessKeyName or SharedAccessKey is empty.
func ParsedConnectionFromStr(connStr string) (*ParsedConn, error) {
const (
endpointKey = "Endpoint"
sharedAccessKeyNameKey = "SharedAccessKeyName"
sharedAccessKeyKey = "SharedAccessKey"
entityPathKey = "EntityPath"
sharedAccessSignatureKey = "SharedAccessSignature"
)
// We can parse two types of connection strings.
// 1. Connection strings generated from the portal (or elsewhere) that contain an embedded key and keyname.
// 2. A specially formatted connection string with an embedded SharedAccessSignature:
// Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>"
var namespace, hubName, keyName, secret, sas string
splits := strings.Split(connStr, ";")
for _, split := range splits {
keyAndValue := strings.SplitN(split, "=", 2)
if len(keyAndValue) < 2 {
return nil, errors.New("failed parsing connection string due to unmatched key value separated by '='")
}
// if a key value pair has `=` in the value, recombine them
key := keyAndValue[0]
value := strings.Join(keyAndValue[1:], "=")
switch {
case strings.EqualFold(endpointKey, key):
u, err := url.Parse(value)
if err != nil {
return nil, errors.New("failed parsing connection string due to an incorrectly formatted Endpoint value")
}
namespace = u.Host
case strings.EqualFold(sharedAccessKeyNameKey, key):
keyName = value
case strings.EqualFold(sharedAccessKeyKey, key):
secret = value
case strings.EqualFold(entityPathKey, key):
hubName = value
case strings.EqualFold(sharedAccessSignatureKey, key):
sas = value
}
}
parsed := &ParsedConn{
Namespace: namespace,
KeyName: keyName,
Key: secret,
HubName: hubName,
SAS: sas,
}
if namespace == "" {
return parsed, fmt.Errorf("key %q must not be empty", endpointKey)
}
if sas == "" && keyName == "" {
return parsed, fmt.Errorf("key %q must not be empty", sharedAccessKeyNameKey)
}
if secret == "" && sas == "" {
return parsed, fmt.Errorf("key %q or %q cannot both be empty", sharedAccessKeyKey, sharedAccessSignatureKey)
}
return parsed, nil
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2021 The Dapr Authors
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@ -17,500 +17,72 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"sync"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/conn"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/storage"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/utils"
"github.com/dapr/components-contrib/metadata"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/pubsub/azure/eventhubs/conn"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
const (
// connection string entity path key.
entityPathKey = "EntityPath"
// metadata partitionKey key.
partitionKeyMetadataKey = "partitionKey"
// errors.
hubManagerCreationErrorMsg = "failed to create eventHub manager client"
invalidConnectionStringErrorMsg = "connectionString is invalid"
missingConnectionStringNamespaceErrorMsg = "connectionString or eventHubNamespace is required"
missingStorageAccountNameErrorMsg = "storageAccountName is a required attribute for subscribe"
missingStorageAccountKeyErrorMsg = "storageAccountKey is required for subscribe when connectionString is provided"
missingStorageContainerNameErrorMsg = "storageContainerName is a required attribute for subscribe"
missingConsumerIDErrorMsg = "missing consumerID attribute for subscribe"
bothConnectionStringNamespaceErrorMsg = "both connectionString and eventHubNamespace are given, only one should be given"
missingResourceGroupNameMsg = "missing resourceGroupName attribute required for entityManagement"
missingSubscriptionIDMsg = "missing subscriptionID attribute required for entityManagement"
entityManagementConnectionStrMsg = "entity management support is not available with connectionString"
differentTopicConnectionStringErrorTmpl = "specified topic %s does not match the Event Hub name in the provided connectionString"
// Event Hubs SystemProperties names for metadata passthrough.
sysPropSequenceNumber = "x-opt-sequence-number"
sysPropEnqueuedTime = "x-opt-enqueued-time"
sysPropOffset = "x-opt-offset"
sysPropPartitionID = "x-opt-partition-id"
sysPropPartitionKey = "x-opt-partition-key"
sysPropIotHubDeviceConnectionID = "iothub-connection-device-id"
sysPropIotHubAuthGenerationID = "iothub-connection-auth-generation-id"
sysPropIotHubConnectionAuthMethod = "iothub-connection-auth-method"
sysPropIotHubConnectionModuleID = "iothub-connection-module-id"
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime"
sysPropMessageID = "message-id"
// Metadata field to ensure all Event Hub properties pass through
requireAllProperties = "requireAllProperties"
defaultMessageRetentionInDays = 1
defaultPartitionCount = 1
resourceCheckMaxRetry = 5
resourceCheckMaxRetryInterval time.Duration = 5 * time.Minute
resourceCreationTimeout time.Duration = 15 * time.Second
resourceGetTimeout time.Duration = 5 * time.Second
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
maxMessageRetention int32 = 90
maxPartitionCount int32 = 1024
)
func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, e *eventhub.Event, handler pubsub.Handler) error {
res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}}
if e.SystemProperties.SequenceNumber != nil {
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
}
if e.SystemProperties.EnqueuedTime != nil {
res.Metadata[sysPropEnqueuedTime] = e.SystemProperties.EnqueuedTime.Format(time.RFC3339)
}
if e.SystemProperties.Offset != nil {
res.Metadata[sysPropOffset] = strconv.FormatInt(*e.SystemProperties.Offset, 10)
}
// According to azure-event-hubs-go docs, this will always be nil.
if e.SystemProperties.PartitionID != nil {
res.Metadata[sysPropPartitionID] = strconv.Itoa(int(*e.SystemProperties.PartitionID))
}
// The following metadata properties are only present if event was generated by Azure IoT Hub.
if e.SystemProperties.PartitionKey != nil {
res.Metadata[sysPropPartitionKey] = *e.SystemProperties.PartitionKey
}
if e.SystemProperties.IoTHubDeviceConnectionID != nil {
res.Metadata[sysPropIotHubDeviceConnectionID] = *e.SystemProperties.IoTHubDeviceConnectionID
}
if e.SystemProperties.IoTHubAuthGenerationID != nil {
res.Metadata[sysPropIotHubAuthGenerationID] = *e.SystemProperties.IoTHubAuthGenerationID
}
if e.SystemProperties.IoTHubConnectionAuthMethod != nil {
res.Metadata[sysPropIotHubConnectionAuthMethod] = *e.SystemProperties.IoTHubConnectionAuthMethod
}
if e.SystemProperties.IoTHubConnectionModuleID != nil {
res.Metadata[sysPropIotHubConnectionModuleID] = *e.SystemProperties.IoTHubConnectionModuleID
}
if e.SystemProperties.IoTHubEnqueuedTime != nil {
res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339)
}
// azure-event-hubs-go SDK pulls out the AMQP message-id property to the Event.ID property, map it from there.
if e.ID != "" {
res.Metadata[sysPropMessageID] = e.ID
}
// added properties if any ( includes application properties from iot-hub)
if getAllProperties {
if e.Properties != nil && len(e.Properties) > 0 {
for key, value := range e.Properties {
if str, ok := value.(string); ok {
res.Metadata[key] = str
}
}
}
}
return handler(ctx, &res)
}
// AzureEventHubs allows sending/receiving Azure Event Hubs events.
type AzureEventHubs struct {
metadata *azureEventHubsMetadata
logger logger.Logger
backOffConfig retry.Config
hubClients map[string]*eventhub.Hub
eventProcessors map[string]*eph.EventProcessorHost
hubManager *eventhub.HubManager
eventHubSettings azauth.EnvironmentSettings
managementSettings azauth.EnvironmentSettings
cgClient *mgmt.ConsumerGroupsClient
tokenProvider *aad.TokenProvider
storageCredential azblob.Credential
azureEnvironment *azure.Environment
}
metadata *azureEventHubsMetadata
logger logger.Logger
type azureEventHubsMetadata struct {
ConnectionString string `json:"connectionString,omitempty" mapstructure:"connectionString"`
EventHubNamespace string `json:"eventHubNamespace,omitempty" mapstructure:"eventHubNamespace"`
ConsumerGroup string `json:"consumerID" mapstructure:"consumerID"`
StorageAccountName string `json:"storageAccountName,omitempty" mapstructure:"storageAccountName"`
StorageAccountKey string `json:"storageAccountKey,omitempty" mapstructure:"storageAccountKey"`
StorageContainerName string `json:"storageContainerName,omitempty" mapstructure:"storageContainerName"`
EnableEntityManagement bool `json:"enableEntityManagement,omitempty,string" mapstructure:"enableEntityManagement"`
MessageRetentionInDays int32 `json:"messageRetentionInDays,omitempty,string" mapstructure:"messageRetentionInDays"`
PartitionCount int32 `json:"partitionCount,omitempty,string" mapstructure:"partitionCount"`
SubscriptionID string `json:"subscriptionID,omitempty" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName,omitempty" mapstructure:"resourceGroupName"`
clientsLock *sync.RWMutex
producerClients map[string]*azeventhubs.ProducerClient
}
// NewAzureEventHubs returns a new Azure Event hubs instance.
func NewAzureEventHubs(logger logger.Logger) pubsub.PubSub {
return &AzureEventHubs{logger: logger}
}
func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, error) {
var m azureEventHubsMetadata
err := metadata.DecodeMetadata(meta.Properties, &m)
if err != nil {
return nil, fmt.Errorf("failed to decode metada: %w", err)
return &AzureEventHubs{
logger: logger,
clientsLock: &sync.RWMutex{},
producerClients: make(map[string]*azeventhubs.ProducerClient, 1),
}
if m.ConnectionString == "" && m.EventHubNamespace == "" {
return nil, errors.New(missingConnectionStringNamespaceErrorMsg)
}
if m.ConnectionString != "" && m.EventHubNamespace != "" {
return nil, errors.New(bothConnectionStringNamespaceErrorMsg)
}
return &m, nil
}
func validateAndGetHubName(connectionString string) (string, error) {
parsed, err := conn.ParsedConnectionFromStr(connectionString)
if err != nil {
return "", err
}
return parsed.HubName, nil
}
func (aeh *AzureEventHubs) ensureEventHub(ctx context.Context, hubName string) error {
if aeh.hubManager == nil {
aeh.logger.Errorf("hubManager client not initialized properly")
return fmt.Errorf("hubManager client not initialized properly")
}
entity, err := aeh.getHubEntity(ctx, hubName)
if err != nil {
return err
}
if entity == nil {
if err := aeh.createHubEntity(ctx, hubName); err != nil {
return err
}
}
return nil
}
func (aeh *AzureEventHubs) ensureSubscription(ctx context.Context, hubName string) error {
err := aeh.ensureEventHub(ctx, hubName)
if err != nil {
return err
}
_, err = aeh.getConsumerGroupsClient()
if err != nil {
return err
}
return aeh.createConsumerGroup(ctx, hubName)
}
func (aeh *AzureEventHubs) getConsumerGroupsClient() (*mgmt.ConsumerGroupsClient, error) {
if aeh.cgClient != nil {
return aeh.cgClient, nil
}
client := mgmt.NewConsumerGroupsClientWithBaseURI(aeh.managementSettings.AzureEnvironment.ResourceManagerEndpoint,
aeh.metadata.SubscriptionID)
a, err := aeh.managementSettings.GetAuthorizer()
if err != nil {
return nil, err
}
client.Authorizer = a
aeh.cgClient = &client
return aeh.cgClient, nil
}
func (aeh *AzureEventHubs) createConsumerGroup(parentCtx context.Context, hubName string) error {
create := false
backOffConfig := retry.DefaultConfig()
backOffConfig.Policy = retry.PolicyExponential
backOffConfig.MaxInterval = resourceCheckMaxRetryInterval
backOffConfig.MaxRetries = resourceCheckMaxRetry
b := backOffConfig.NewBackOffWithContext(parentCtx)
err := retry.NotifyRecover(func() error {
c, err := aeh.shouldCreateConsumerGroup(parentCtx, hubName)
if err == nil {
create = c
return nil
}
return err
}, b, func(_ error, _ time.Duration) {
aeh.logger.Errorf("Error checking for consumer group for EventHub : %s. Retrying...", hubName)
}, func() {
aeh.logger.Warnf("Successfully checked for consumer group in EventHub %s after it previously failed.", hubName)
})
if err != nil {
return err
}
if create {
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
_, err = aeh.cgClient.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup, mgmt.ConsumerGroup{})
cancel()
if err != nil {
return err
}
}
return nil
}
func (aeh *AzureEventHubs) shouldCreateConsumerGroup(parentCtx context.Context, hubName string) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
g, err := aeh.cgClient.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup)
cancel()
if err != nil {
if g.HasHTTPStatus(404) {
return true, nil
}
return false, err
}
if *g.Name == aeh.metadata.ConsumerGroup {
aeh.logger.Infof("consumer group %s exists for the requested topic/eventHub %s", aeh.metadata.ConsumerGroup, hubName)
}
return false, nil
}
func (aeh *AzureEventHubs) getHubEntity(parentCtx context.Context, hubName string) (*eventhub.HubEntity, error) {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
defer cancel()
return aeh.hubManager.Get(ctx, hubName)
}
func (aeh *AzureEventHubs) createHubEntity(parentCtx context.Context, hubName string) error {
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
_, err := aeh.hubManager.Put(ctx, hubName,
eventhub.HubWithMessageRetentionInDays(aeh.metadata.MessageRetentionInDays),
eventhub.HubWithPartitionCount(aeh.metadata.PartitionCount))
cancel()
if err != nil {
aeh.logger.Errorf("error creating event hub %s: %s", hubName, err)
return fmt.Errorf("error creating event hub %s: %s", hubName, err)
}
return nil
}
func (aeh *AzureEventHubs) ensurePublisherClient(ctx context.Context, hubName string) error {
if aeh.metadata.EnableEntityManagement {
if err := aeh.ensureEventHub(ctx, hubName); err != nil {
return err
}
}
userAgent := "dapr-" + logger.DaprVersion
if aeh.metadata.ConnectionString != "" {
// Connect with connection string.
newConnectionString, err := aeh.constructConnectionStringFromTopic(hubName)
if err != nil {
return err
}
hub, err := eventhub.NewHubFromConnectionString(newConnectionString,
eventhub.HubWithUserAgent(userAgent))
if err != nil {
aeh.logger.Debugf("unable to connect to azure event hubs: %v", err)
return fmt.Errorf("unable to connect to azure event hubs: %v", err)
}
aeh.hubClients[hubName] = hub
} else {
if hubName == "" {
return errors.New("error: missing topic/hubName attribute with AAD connection")
}
hub, err := eventhub.NewHub(aeh.metadata.EventHubNamespace, hubName, aeh.tokenProvider, eventhub.HubWithUserAgent(userAgent))
if err != nil {
return fmt.Errorf("unable to connect to azure event hubs: %v", err)
}
aeh.hubClients[hubName] = hub
}
return nil
}
func (aeh *AzureEventHubs) ensureSubscriberClient(ctx context.Context, topic string, leaserCheckpointer *storage.LeaserCheckpointer) (*eph.EventProcessorHost, error) {
// connectionString given.
if aeh.metadata.ConnectionString != "" {
hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString)
if err != nil {
return nil, fmt.Errorf("error parsing connection string %s", err)
}
if hubName != "" && hubName != topic {
return nil, fmt.Errorf("error: component cannot subscribe to requested topic %s with the given connectionString", topic)
}
if hubName == "" {
aeh.logger.Debugf("eventhub namespace connection string given. using topic as event hub entity path")
}
connectionString, err := aeh.constructConnectionStringFromTopic(topic)
if err != nil {
return nil, err
}
processor, err := eph.NewFromConnectionString(
ctx, connectionString,
leaserCheckpointer,
leaserCheckpointer,
eph.WithNoBanner(),
eph.WithConsumerGroup(aeh.metadata.ConsumerGroup),
)
if err != nil {
return nil, err
}
aeh.logger.Debugf("processor initialized via connection string for topic %s", topic)
return processor, nil
}
// AAD connection.
processor, err := eph.New(ctx,
aeh.metadata.EventHubNamespace,
topic,
aeh.tokenProvider,
leaserCheckpointer,
leaserCheckpointer,
eph.WithNoBanner(),
eph.WithConsumerGroup(aeh.metadata.ConsumerGroup),
)
if err != nil {
return nil, err
}
aeh.logger.Debugf("processor initialized via AAD for topic %s", topic)
return processor, nil
}
func (aeh *AzureEventHubs) createHubManager() error {
// Only AAD based authentication supported.
hubManager, err := eventhub.NewHubManagerFromAzureEnvironment(aeh.metadata.EventHubNamespace, aeh.tokenProvider, *aeh.eventHubSettings.AzureEnvironment)
if err != nil {
return fmt.Errorf("%s %s", hubManagerCreationErrorMsg, err)
}
aeh.hubManager = hubManager
return nil
}
func (aeh *AzureEventHubs) constructConnectionStringFromTopic(requestedTopic string) (string, error) {
hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString)
if err != nil {
return "", err
}
if hubName != "" && hubName == requestedTopic {
return aeh.metadata.ConnectionString, nil
} else if hubName != "" {
return "", fmt.Errorf(differentTopicConnectionStringErrorTmpl, requestedTopic)
}
return aeh.metadata.ConnectionString + ";" + entityPathKey + "=" + requestedTopic, nil
}
func (aeh *AzureEventHubs) validateEnitityManagementMetadata() error {
if aeh.metadata.MessageRetentionInDays <= 0 || aeh.metadata.MessageRetentionInDays > maxMessageRetention {
aeh.logger.Warnf("invalid/no message retention time period is given with entity management enabled, default value of %d is used", defaultMessageRetentionInDays)
aeh.metadata.MessageRetentionInDays = defaultMessageRetentionInDays
}
if aeh.metadata.PartitionCount <= 0 || aeh.metadata.PartitionCount > maxPartitionCount {
aeh.logger.Warnf("invalid/no partition count is given with entity management enabled, default value of %d is used", defaultPartitionCount)
aeh.metadata.PartitionCount = defaultPartitionCount
}
if aeh.metadata.ResourceGroupName == "" {
return errors.New(missingResourceGroupNameMsg)
}
if aeh.metadata.SubscriptionID == "" {
return errors.New(missingSubscriptionIDMsg)
}
return nil
}
func (aeh *AzureEventHubs) validateSubscriptionAttributes() error {
m := *aeh.metadata
if m.StorageAccountName == "" {
return errors.New(missingStorageAccountNameErrorMsg)
}
if m.StorageAccountKey == "" && m.ConnectionString != "" {
return errors.New(missingStorageAccountKeyErrorMsg)
}
if m.StorageContainerName == "" {
return errors.New(missingStorageContainerNameErrorMsg)
}
if m.ConsumerGroup == "" {
return errors.New(missingConsumerIDErrorMsg)
}
return nil
}
func (aeh *AzureEventHubs) getStoragePrefixString(topic string) string {
// empty string in the end of slice to have a suffix "-".
return strings.Join([]string{"dapr", topic, aeh.metadata.ConsumerGroup, ""}, "-")
}
// Init connects to Azure Event Hubs.
func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
m, parseErr := parseEventHubsMetadata(metadata)
if parseErr != nil {
return parseErr
m, err := parseEventHubsMetadata(metadata, aeh.logger)
if err != nil {
return err
}
aeh.metadata = m
aeh.eventProcessors = map[string]*eph.EventProcessorHost{}
aeh.hubClients = map[string]*eventhub.Hub{}
if aeh.metadata.ConnectionString != "" {
// Validate connectionString.
hubName, validateErr := validateAndGetHubName(aeh.metadata.ConnectionString)
if validateErr != nil {
return errors.New(invalidConnectionStringErrorMsg)
}
if hubName != "" {
aeh.logger.Infof("connectionString provided is specific to event hub %q. Publishing or subscribing to a topic that does not match this event hub will fail when attempted.", hubName)
} else {
aeh.logger.Infof("hubName not given in connectionString. connection established on first publish/subscribe")
aeh.logger.Debugf("req.Topic field in incoming requests honored")
}
if aeh.metadata.EnableEntityManagement {
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-management-libraries
return errors.New(entityManagementConnectionStrMsg)
}
} else {
// Connect via AAD.
settings, sErr := azauth.NewEnvironmentSettings(azauth.AzureEventHubsResourceName, metadata.Properties)
if sErr != nil {
return sErr
}
aeh.eventHubSettings = settings
tokenProvider, err := aeh.eventHubSettings.GetAMQPTokenProvider()
// Connect using the connection string
var parsedConn *conn.ParsedConn
parsedConn, err = conn.ParsedConnectionFromStr(aeh.metadata.ConnectionString)
if err != nil {
return fmt.Errorf("%s %s", hubManagerCreationErrorMsg, err)
return fmt.Errorf("connectionString is invalid: %w", err)
}
aeh.tokenProvider = tokenProvider
aeh.logger.Info("connecting to Azure EventHubs via AAD. connection established on first publish/subscribe")
aeh.logger.Debugf("req.Topic field in incoming requests honored")
if aeh.metadata.EnableEntityManagement {
if parsedConn.HubName != "" {
aeh.logger.Infof(`The provided connection string is specific to the Event Hub ("entity path") '%s'; publishing or subscribing to a topic that does not match this Event Hub will fail when attempted`, parsedConn.HubName)
} else {
aeh.logger.Infof(`The provided connection string does not contain an Event Hub name ("entity path"); the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored`)
}
aeh.metadata.hubName = parsedConn.HubName
} else {
// Connect via Azure AD
var env azauth.EnvironmentSettings
env, err = azauth.NewEnvironmentSettings("eventhubs", metadata.Properties)
if err != nil {
return fmt.Errorf("failed to initialize Azure AD credentials: %w", err)
}
aeh.metadata.aadTokenProvider, err = env.GetTokenCredential()
if err != nil {
return fmt.Errorf("failed to get Azure AD token credentials provider: %w", err)
}
aeh.logger.Info("connecting to Azure Event Hub using Azure AD; the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored")
/*if aeh.metadata.EnableEntityManagement {
if err := aeh.validateEnitityManagementMetadata(); err != nil {
return err
}
@ -526,193 +98,137 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
return err
}
aeh.managementSettings = settings
}
}*/
}
// connect to the storage account.
if m.StorageAccountKey != "" {
// Connect to the Azure Storage account
/*if m.StorageAccountKey != "" {
metadata.Properties["accountKey"] = m.StorageAccountKey
}
var storageCredsErr error
aeh.storageCredential, aeh.azureEnvironment, storageCredsErr = azauth.GetAzureStorageBlobCredentials(aeh.logger, m.StorageAccountName, metadata.Properties)
if storageCredsErr != nil {
return fmt.Errorf("invalid storage credentials with error: %w", storageCredsErr)
}
}*/
// Default retry configuration is used if no backOff properties are set.
if err := retry.DecodeConfigWithPrefix(
&aeh.backOffConfig,
metadata.Properties,
"backOff"); err != nil {
return err
}
return nil
}
// Publish sends data to Azure Event Hubs.
func (aeh *AzureEventHubs) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil {
return fmt.Errorf("error on establishing hub connection: %s", err)
}
}
event := &eventhub.Event{Data: req.Data}
val, ok := req.Metadata[partitionKeyMetadataKey]
if ok {
event.PartitionKey = &val
}
err := aeh.hubClients[req.Topic].Send(ctx, event)
if err != nil {
return fmt.Errorf("error from publish: %s", err)
}
return nil
}
// BulkPublish sends data to Azure Event Hubs in bulk.
func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil {
err = fmt.Errorf("error on establishing hub connection: %s", err)
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
}
// Create a slice of events to send.
events := make([]*eventhub.Event, len(req.Entries))
for i, entry := range req.Entries {
events[i] = &eventhub.Event{Data: entry.Event}
if val, ok := entry.Metadata[partitionKeyMetadataKey]; ok {
events[i].PartitionKey = &val
}
}
// Configure options for sending events.
opts := []eventhub.BatchOption{
eventhub.BatchWithMaxSizeInBytes(utils.GetElemOrDefaultFromMap(
req.Metadata, contribMetadata.MaxBulkPubBytesKey, int(eventhub.DefaultMaxMessageSizeInBytes))),
}
// Send events.
err := aeh.hubClients[req.Topic].SendBatch(ctx, eventhub.NewEventBatchIterator(events...), opts...)
if err != nil {
// Partial success is not supported by Azure Event Hubs.
// If an error occurs, all events are considered failed.
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
return pubsub.BulkPublishResponse{}, nil
}
// Subscribe receives data from Azure Event Hubs.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := aeh.validateSubscriptionAttributes()
if err != nil {
return fmt.Errorf("error : error on subscribe %s", err)
}
if aeh.metadata.EnableEntityManagement {
if err = aeh.ensureSubscription(subscribeCtx, req.Topic); err != nil {
return err
}
}
// Set topic name, consumerID prefix for partition checkpoint lease blob path.
// This is needed to support multiple consumers for the topic using the same storage container.
leaserPrefixOpt := storage.WithPrefixInBlobPath(aeh.getStoragePrefixString(req.Topic))
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(aeh.storageCredential, aeh.metadata.StorageAccountName, aeh.metadata.StorageContainerName, *aeh.azureEnvironment, leaserPrefixOpt)
if err != nil {
return err
}
processor, err := aeh.ensureSubscriberClient(subscribeCtx, req.Topic, leaserCheckpointer)
if err != nil {
return err
}
getAllProperties := false
if req.Metadata[requireAllProperties] != "" {
getAllProperties, err = strconv.ParseBool(req.Metadata[requireAllProperties])
if err != nil {
aeh.logger.Errorf("invalid value for metadata : %s . Error: %v.", requireAllProperties, err)
}
}
aeh.logger.Debugf("registering handler for topic %s", req.Topic)
_, err = processor.RegisterHandler(subscribeCtx,
func(_ context.Context, e *eventhub.Event) error {
// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx)
retryerr := retry.NotifyRecover(func() error {
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)
return subscribeHandler(subscribeCtx, req.Topic, getAllProperties, e, handler)
}, b, func(_ error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID)
}, func() {
aeh.logger.Warnf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID)
})
if retryerr != nil {
aeh.logger.Errorf("Too many failed attempts at processing Eventhubs event: %s/%s. Error: %v.", req.Topic, e.ID, err)
}
return retryerr
})
if err != nil {
return err
}
err = processor.StartNonBlocking(subscribeCtx)
if err != nil {
return err
}
aeh.eventProcessors[req.Topic] = processor
// Listen for context cancelation and stop processing messages
// This seems to be necessary because otherwise the processor isn't automatically closed on context cancelation
go func() {
<-subscribeCtx.Done()
stopCtx, stopCancel := context.WithTimeout(context.Background(), resourceGetTimeout)
stopErr := processor.Close(stopCtx)
stopCancel()
if stopErr != nil {
aeh.logger.Warnf("Error closing subscribe processor: %v", stopErr)
}
}()
return nil
}
func (aeh *AzureEventHubs) Close() (err error) {
flag := false
var ctx context.Context
var cancel context.CancelFunc
for topic, client := range aeh.hubClients {
ctx, cancel = context.WithTimeout(context.Background(), resourceGetTimeout)
err = client.Close(ctx)
cancel()
if err != nil {
flag = true
aeh.logger.Warnf("error closing publish client properly for topic/eventHub %s: %s", topic, err)
}
}
aeh.hubClients = map[string]*eventhub.Hub{}
for topic, client := range aeh.eventProcessors {
ctx, cancel = context.WithTimeout(context.Background(), resourceGetTimeout)
err = client.Close(ctx)
cancel()
if err != nil {
flag = true
aeh.logger.Warnf("error closing event processor host client properly for topic/eventHub %s: %s", topic, err)
}
}
aeh.eventProcessors = map[string]*eph.EventProcessorHost{}
if flag {
return errors.New("error closing event hub clients in a proper fashion")
}
return nil
}
func (aeh *AzureEventHubs) Features() []pubsub.Feature {
return nil
}
// Publish sends a message to Azure Event Hubs.
func (aeh *AzureEventHubs) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
if req.Topic == "" {
return errors.New("parameter 'topic' is required")
}
// Get the producer client
client, err := aeh.getProducerClientForTopic(ctx, req.Topic)
if err != nil {
return fmt.Errorf("error trying to establish a connection: %w", err)
}
// Build the batch of messages
batchOpts := &azeventhubs.EventDataBatchOptions{}
if pk := req.Metadata["partitionKey"]; pk != "" {
batchOpts.PartitionKey = &pk
}
batch, err := client.NewEventDataBatch(ctx, batchOpts)
if err != nil {
return fmt.Errorf("error creating batch: %w", err)
}
err = batch.AddEventData(&azeventhubs.EventData{
Body: req.Data,
ContentType: req.ContentType,
}, nil)
if err != nil {
return fmt.Errorf("error adding message to batch: %w", err)
}
// Send the message
client.SendEventDataBatch(ctx, batch, nil)
if err != nil {
return fmt.Errorf("error publishing batch: %w", err)
}
return nil
}
func (aeh *AzureEventHubs) getProducerClientForTopic(ctx context.Context, topic string) (client *azeventhubs.ProducerClient, err error) {
// Check if we have the producer client in the cache
aeh.clientsLock.RLock()
client = aeh.producerClients[topic]
aeh.clientsLock.RUnlock()
if client != nil {
return client, nil
}
// After acquiring a write lock, check again if the producer exists in the cache just in case another goroutine created it in the meanwhile
aeh.clientsLock.Lock()
defer aeh.clientsLock.Unlock()
client = aeh.producerClients[topic]
if client != nil {
return client, nil
}
// Start by creating a new entity if needed
if aeh.metadata.EnableEntityManagement {
// TODO: Create a new entity
}
clientOpts := &azeventhubs.ProducerClientOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
}
// Check if we're authenticating using a connection string
if aeh.metadata.ConnectionString != "" {
var connString string
connString, err = aeh.constructConnectionStringFromTopic(topic)
if err != nil {
return nil, err
}
client, err = azeventhubs.NewProducerClientFromConnectionString(connString, "", clientOpts)
if err != nil {
return nil, fmt.Errorf("unable to connect to Azure Event Hub using a connection string: %w", err)
}
} else {
client, err = azeventhubs.NewProducerClient(aeh.metadata.EventHubNamespace, topic, aeh.metadata.aadTokenProvider, nil)
if err != nil {
return nil, fmt.Errorf("unable to connect to Azure Event Hub using Azure AD: %w", err)
}
}
// Store in the cache before returning
aeh.producerClients[topic] = client
return client, nil
}
// BulkPublish sends data to Azure Event Hubs in bulk.
func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
return pubsub.BulkPublishResponse{}, nil
}
// Subscribe receives data from Azure Event Hubs.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
return nil
}
func (aeh *AzureEventHubs) Close() (err error) {
return nil
}
// Returns a connection string with the Event Hub name (entity path) set if not present
func (aeh *AzureEventHubs) constructConnectionStringFromTopic(topic string) (string, error) {
if aeh.metadata.hubName != "" {
if aeh.metadata.hubName != topic {
return "", fmt.Errorf("the requested topic '%s' does not match the Event Hub name in the connection string", topic)
}
return aeh.metadata.ConnectionString, nil
}
connString := aeh.metadata.ConnectionString + ";EntityPath=" + topic
return connString, nil
}

View File

@ -0,0 +1,716 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/conn"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/storage"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/utils"
"github.com/dapr/components-contrib/metadata"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
const (
// connection string entity path key.
entityPathKey = "EntityPath"
// metadata partitionKey key.
partitionKeyMetadataKey = "partitionKey"
// errors.
hubManagerCreationErrorMsg = "failed to create eventHub manager client"
invalidConnectionStringErrorMsg = "connectionString is invalid"
missingConnectionStringNamespaceErrorMsg = "connectionString or eventHubNamespace is required"
missingStorageAccountNameErrorMsg = "storageAccountName is a required attribute for subscribe"
missingStorageAccountKeyErrorMsg = "storageAccountKey is required for subscribe when connectionString is provided"
missingStorageContainerNameErrorMsg = "storageContainerName is a required attribute for subscribe"
missingConsumerIDErrorMsg = "missing consumerID attribute for subscribe"
bothConnectionStringNamespaceErrorMsg = "both connectionString and eventHubNamespace are given, only one should be given"
missingResourceGroupNameMsg = "missing resourceGroupName attribute required for entityManagement"
missingSubscriptionIDMsg = "missing subscriptionID attribute required for entityManagement"
entityManagementConnectionStrMsg = "entity management support is not available with connectionString"
differentTopicConnectionStringErrorTmpl = "specified topic %s does not match the Event Hub name in the provided connectionString"
// Event Hubs SystemProperties names for metadata passthrough.
sysPropSequenceNumber = "x-opt-sequence-number"
sysPropEnqueuedTime = "x-opt-enqueued-time"
sysPropOffset = "x-opt-offset"
sysPropPartitionID = "x-opt-partition-id"
sysPropPartitionKey = "x-opt-partition-key"
sysPropIotHubDeviceConnectionID = "iothub-connection-device-id"
sysPropIotHubAuthGenerationID = "iothub-connection-auth-generation-id"
sysPropIotHubConnectionAuthMethod = "iothub-connection-auth-method"
sysPropIotHubConnectionModuleID = "iothub-connection-module-id"
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime"
sysPropMessageID = "message-id"
// Metadata field to ensure all Event Hub properties pass through
requireAllProperties = "requireAllProperties"
defaultMessageRetentionInDays = 1
defaultPartitionCount = 1
resourceCheckMaxRetry = 5
resourceCheckMaxRetryInterval time.Duration = 5 * time.Minute
resourceCreationTimeout time.Duration = 15 * time.Second
resourceGetTimeout time.Duration = 5 * time.Second
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
maxMessageRetention int32 = 90
maxPartitionCount int32 = 1024
)
func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, e *eventhub.Event, handler pubsub.Handler) error {
res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}}
if e.SystemProperties.SequenceNumber != nil {
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
}
if e.SystemProperties.EnqueuedTime != nil {
res.Metadata[sysPropEnqueuedTime] = e.SystemProperties.EnqueuedTime.Format(time.RFC3339)
}
if e.SystemProperties.Offset != nil {
res.Metadata[sysPropOffset] = strconv.FormatInt(*e.SystemProperties.Offset, 10)
}
// According to azure-event-hubs-go docs, this will always be nil.
if e.SystemProperties.PartitionID != nil {
res.Metadata[sysPropPartitionID] = strconv.Itoa(int(*e.SystemProperties.PartitionID))
}
// The following metadata properties are only present if event was generated by Azure IoT Hub.
if e.SystemProperties.PartitionKey != nil {
res.Metadata[sysPropPartitionKey] = *e.SystemProperties.PartitionKey
}
if e.SystemProperties.IoTHubDeviceConnectionID != nil {
res.Metadata[sysPropIotHubDeviceConnectionID] = *e.SystemProperties.IoTHubDeviceConnectionID
}
if e.SystemProperties.IoTHubAuthGenerationID != nil {
res.Metadata[sysPropIotHubAuthGenerationID] = *e.SystemProperties.IoTHubAuthGenerationID
}
if e.SystemProperties.IoTHubConnectionAuthMethod != nil {
res.Metadata[sysPropIotHubConnectionAuthMethod] = *e.SystemProperties.IoTHubConnectionAuthMethod
}
if e.SystemProperties.IoTHubConnectionModuleID != nil {
res.Metadata[sysPropIotHubConnectionModuleID] = *e.SystemProperties.IoTHubConnectionModuleID
}
if e.SystemProperties.IoTHubEnqueuedTime != nil {
res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339)
}
// azure-event-hubs-go SDK pulls out the AMQP message-id property to the Event.ID property, map it from there.
if e.ID != "" {
res.Metadata[sysPropMessageID] = e.ID
}
// added properties if any ( includes application properties from iot-hub)
if getAllProperties {
if e.Properties != nil && len(e.Properties) > 0 {
for key, value := range e.Properties {
if str, ok := value.(string); ok {
res.Metadata[key] = str
}
}
}
}
return handler(ctx, &res)
}
// AzureEventHubs allows sending/receiving Azure Event Hubs events.
type AzureEventHubs struct {
metadata *azureEventHubsMetadata
logger logger.Logger
backOffConfig retry.Config
hubClients map[string]*eventhub.Hub
eventProcessors map[string]*eph.EventProcessorHost
hubManager *eventhub.HubManager
eventHubSettings azauth.EnvironmentSettings
managementSettings azauth.EnvironmentSettings
cgClient *mgmt.ConsumerGroupsClient
tokenProvider *aad.TokenProvider
storageCredential azblob.Credential
azureEnvironment *azure.Environment
}
type azureEventHubsMetadata struct {
ConnectionString string `json:"connectionString,omitempty" mapstructure:"connectionString"`
EventHubNamespace string `json:"eventHubNamespace,omitempty" mapstructure:"eventHubNamespace"`
ConsumerGroup string `json:"consumerID" mapstructure:"consumerID"`
StorageAccountName string `json:"storageAccountName,omitempty" mapstructure:"storageAccountName"`
StorageAccountKey string `json:"storageAccountKey,omitempty" mapstructure:"storageAccountKey"`
StorageContainerName string `json:"storageContainerName,omitempty" mapstructure:"storageContainerName"`
EnableEntityManagement bool `json:"enableEntityManagement,omitempty,string" mapstructure:"enableEntityManagement"`
MessageRetentionInDays int32 `json:"messageRetentionInDays,omitempty,string" mapstructure:"messageRetentionInDays"`
PartitionCount int32 `json:"partitionCount,omitempty,string" mapstructure:"partitionCount"`
SubscriptionID string `json:"subscriptionID,omitempty" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName,omitempty" mapstructure:"resourceGroupName"`
}
// NewAzureEventHubs returns a new Azure Event hubs instance.
func NewAzureEventHubs(logger logger.Logger) pubsub.PubSub {
return &AzureEventHubs{logger: logger}
}
func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, error) {
var m azureEventHubsMetadata
err := metadata.DecodeMetadata(meta.Properties, &m)
if err != nil {
return nil, fmt.Errorf("failed to decode metada: %w", err)
}
if m.ConnectionString == "" && m.EventHubNamespace == "" {
return nil, errors.New(missingConnectionStringNamespaceErrorMsg)
}
if m.ConnectionString != "" && m.EventHubNamespace != "" {
return nil, errors.New(bothConnectionStringNamespaceErrorMsg)
}
return &m, nil
}
func validateAndGetHubName(connectionString string) (string, error) {
parsed, err := conn.ParsedConnectionFromStr(connectionString)
if err != nil {
return "", err
}
return parsed.HubName, nil
}
func (aeh *AzureEventHubs) ensureEventHub(ctx context.Context, hubName string) error {
if aeh.hubManager == nil {
aeh.logger.Errorf("hubManager client not initialized properly")
return fmt.Errorf("hubManager client not initialized properly")
}
entity, err := aeh.getHubEntity(ctx, hubName)
if err != nil {
return err
}
if entity == nil {
if err := aeh.createHubEntity(ctx, hubName); err != nil {
return err
}
}
return nil
}
func (aeh *AzureEventHubs) ensureSubscription(ctx context.Context, hubName string) error {
err := aeh.ensureEventHub(ctx, hubName)
if err != nil {
return err
}
_, err = aeh.getConsumerGroupsClient()
if err != nil {
return err
}
return aeh.createConsumerGroup(ctx, hubName)
}
func (aeh *AzureEventHubs) getConsumerGroupsClient() (*mgmt.ConsumerGroupsClient, error) {
if aeh.cgClient != nil {
return aeh.cgClient, nil
}
client := mgmt.NewConsumerGroupsClientWithBaseURI(aeh.managementSettings.AzureEnvironment.ResourceManagerEndpoint,
aeh.metadata.SubscriptionID)
a, err := aeh.managementSettings.GetAuthorizer()
if err != nil {
return nil, err
}
client.Authorizer = a
aeh.cgClient = &client
return aeh.cgClient, nil
}
func (aeh *AzureEventHubs) createConsumerGroup(parentCtx context.Context, hubName string) error {
create := false
backOffConfig := retry.DefaultConfig()
backOffConfig.Policy = retry.PolicyExponential
backOffConfig.MaxInterval = resourceCheckMaxRetryInterval
backOffConfig.MaxRetries = resourceCheckMaxRetry
b := backOffConfig.NewBackOffWithContext(parentCtx)
err := retry.NotifyRecover(func() error {
c, err := aeh.shouldCreateConsumerGroup(parentCtx, hubName)
if err == nil {
create = c
return nil
}
return err
}, b, func(_ error, _ time.Duration) {
aeh.logger.Errorf("Error checking for consumer group for EventHub : %s. Retrying...", hubName)
}, func() {
aeh.logger.Warnf("Successfully checked for consumer group in EventHub %s after it previously failed.", hubName)
})
if err != nil {
return err
}
if create {
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
_, err = aeh.cgClient.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup, mgmt.ConsumerGroup{})
cancel()
if err != nil {
return err
}
}
return nil
}
func (aeh *AzureEventHubs) shouldCreateConsumerGroup(parentCtx context.Context, hubName string) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
g, err := aeh.cgClient.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup)
cancel()
if err != nil {
if g.HasHTTPStatus(404) {
return true, nil
}
return false, err
}
if *g.Name == aeh.metadata.ConsumerGroup {
aeh.logger.Infof("consumer group %s exists for the requested topic/eventHub %s", aeh.metadata.ConsumerGroup, hubName)
}
return false, nil
}
func (aeh *AzureEventHubs) getHubEntity(parentCtx context.Context, hubName string) (*eventhub.HubEntity, error) {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
defer cancel()
return aeh.hubManager.Get(ctx, hubName)
}
func (aeh *AzureEventHubs) createHubEntity(parentCtx context.Context, hubName string) error {
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
_, err := aeh.hubManager.Put(ctx, hubName,
eventhub.HubWithMessageRetentionInDays(aeh.metadata.MessageRetentionInDays),
eventhub.HubWithPartitionCount(aeh.metadata.PartitionCount))
cancel()
if err != nil {
aeh.logger.Errorf("error creating event hub %s: %s", hubName, err)
return fmt.Errorf("error creating event hub %s: %s", hubName, err)
}
return nil
}
func (aeh *AzureEventHubs) ensurePublisherClient(ctx context.Context, hubName string) error {
if aeh.metadata.EnableEntityManagement {
if err := aeh.ensureEventHub(ctx, hubName); err != nil {
return err
}
}
userAgent := "dapr-" + logger.DaprVersion
if aeh.metadata.ConnectionString != "" {
// Connect with connection string.
newConnectionString, err := aeh.constructConnectionStringFromTopic(hubName)
if err != nil {
return err
}
hub, err := eventhub.NewHubFromConnectionString(newConnectionString,
eventhub.HubWithUserAgent(userAgent))
if err != nil {
aeh.logger.Debugf("unable to connect to azure event hubs: %v", err)
return fmt.Errorf("unable to connect to azure event hubs: %v", err)
}
aeh.hubClients[hubName] = hub
} else {
if hubName == "" {
return errors.New("error: missing topic/hubName attribute with AAD connection")
}
hub, err := eventhub.NewHub(aeh.metadata.EventHubNamespace, hubName, aeh.tokenProvider, eventhub.HubWithUserAgent(userAgent))
if err != nil {
return fmt.Errorf("unable to connect to azure event hubs: %v", err)
}
aeh.hubClients[hubName] = hub
}
return nil
}
func (aeh *AzureEventHubs) ensureSubscriberClient(ctx context.Context, topic string, leaserCheckpointer *storage.LeaserCheckpointer) (*eph.EventProcessorHost, error) {
// connectionString given.
if aeh.metadata.ConnectionString != "" {
hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString)
if err != nil {
return nil, fmt.Errorf("error parsing connection string %s", err)
}
if hubName != "" && hubName != topic {
return nil, fmt.Errorf("error: component cannot subscribe to requested topic %s with the given connectionString", topic)
}
if hubName == "" {
aeh.logger.Debugf("eventhub namespace connection string given. using topic as event hub entity path")
}
connectionString, err := aeh.constructConnectionStringFromTopic(topic)
if err != nil {
return nil, err
}
processor, err := eph.NewFromConnectionString(
ctx, connectionString,
leaserCheckpointer,
leaserCheckpointer,
eph.WithNoBanner(),
eph.WithConsumerGroup(aeh.metadata.ConsumerGroup),
)
if err != nil {
return nil, err
}
aeh.logger.Debugf("processor initialized via connection string for topic %s", topic)
return processor, nil
}
// AAD connection.
processor, err := eph.New(ctx,
aeh.metadata.EventHubNamespace,
topic,
aeh.tokenProvider,
leaserCheckpointer,
leaserCheckpointer,
eph.WithNoBanner(),
eph.WithConsumerGroup(aeh.metadata.ConsumerGroup),
)
if err != nil {
return nil, err
}
aeh.logger.Debugf("processor initialized via AAD for topic %s", topic)
return processor, nil
}
func (aeh *AzureEventHubs) createHubManager() error {
// Only AAD based authentication supported.
hubManager, err := eventhub.NewHubManagerFromAzureEnvironment(aeh.metadata.EventHubNamespace, aeh.tokenProvider, *aeh.eventHubSettings.AzureEnvironment)
if err != nil {
return fmt.Errorf("%s %s", hubManagerCreationErrorMsg, err)
}
aeh.hubManager = hubManager
return nil
}
func (aeh *AzureEventHubs) constructConnectionStringFromTopic(requestedTopic string) (string, error) {
hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString)
if err != nil {
return "", err
}
if hubName != "" && hubName == requestedTopic {
return aeh.metadata.ConnectionString, nil
} else if hubName != "" {
return "", fmt.Errorf(differentTopicConnectionStringErrorTmpl, requestedTopic)
}
return aeh.metadata.ConnectionString + ";" + entityPathKey + "=" + requestedTopic, nil
}
func (aeh *AzureEventHubs) validateEnitityManagementMetadata() error {
if aeh.metadata.MessageRetentionInDays <= 0 || aeh.metadata.MessageRetentionInDays > maxMessageRetention {
aeh.logger.Warnf("invalid/no message retention time period is given with entity management enabled, default value of %d is used", defaultMessageRetentionInDays)
aeh.metadata.MessageRetentionInDays = defaultMessageRetentionInDays
}
if aeh.metadata.PartitionCount <= 0 || aeh.metadata.PartitionCount > maxPartitionCount {
aeh.logger.Warnf("invalid/no partition count is given with entity management enabled, default value of %d is used", defaultPartitionCount)
aeh.metadata.PartitionCount = defaultPartitionCount
}
if aeh.metadata.ResourceGroupName == "" {
return errors.New(missingResourceGroupNameMsg)
}
if aeh.metadata.SubscriptionID == "" {
return errors.New(missingSubscriptionIDMsg)
}
return nil
}
func (aeh *AzureEventHubs) validateSubscriptionAttributes() error {
m := *aeh.metadata
if m.StorageAccountName == "" {
return errors.New("storageAccountName is a required attribute for subscribe")
}
if m.StorageAccountKey == "" && m.ConnectionString != "" {
return errors.New("storageAccountKey is required for subscribe when connectionString is provided")
}
if m.StorageContainerName == "" {
return errors.New("storageContainerName is a required attribute for subscribe")
}
if m.ConsumerGroup == "" {
return errors.New("consumerID is a required attribute for subscribe")
}
return nil
}
func (aeh *AzureEventHubs) getStoragePrefixString(topic string) string {
return fmt.Sprintf("dapr-%s-%s-", topic, aeh.metadata.ConsumerGroup)
}
// Init connects to Azure Event Hubs.
func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
m, parseErr := parseEventHubsMetadata(metadata)
if parseErr != nil {
return parseErr
}
aeh.metadata = m
aeh.eventProcessors = map[string]*eph.EventProcessorHost{}
aeh.hubClients = map[string]*eventhub.Hub{}
if aeh.metadata.ConnectionString != "" {
// Validate connectionString.
hubName, validateErr := validateAndGetHubName(aeh.metadata.ConnectionString)
if validateErr != nil {
return errors.New(invalidConnectionStringErrorMsg)
}
if hubName != "" {
aeh.logger.Infof("connectionString provided is specific to event hub %q. Publishing or subscribing to a topic that does not match this event hub will fail when attempted.", hubName)
} else {
aeh.logger.Infof("hubName not given in connectionString. connection established on first publish/subscribe")
aeh.logger.Debugf("req.Topic field in incoming requests honored")
}
if aeh.metadata.EnableEntityManagement {
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-management-libraries
return errors.New(entityManagementConnectionStrMsg)
}
} else {
// Connect via AAD.
settings, sErr := azauth.NewEnvironmentSettings(azauth.AzureEventHubsResourceName, metadata.Properties)
if sErr != nil {
return sErr
}
aeh.eventHubSettings = settings
tokenProvider, err := aeh.eventHubSettings.GetAMQPTokenProvider()
if err != nil {
return fmt.Errorf("%s %s", hubManagerCreationErrorMsg, err)
}
aeh.tokenProvider = tokenProvider
aeh.logger.Info("connecting to Azure EventHubs via AAD. connection established on first publish/subscribe")
aeh.logger.Debugf("req.Topic field in incoming requests honored")
if aeh.metadata.EnableEntityManagement {
if err := aeh.validateEnitityManagementMetadata(); err != nil {
return err
}
// Create hubManager for eventHub management with AAD.
if managerCreateErr := aeh.createHubManager(); managerCreateErr != nil {
return managerCreateErr
}
// Get Azure Management plane settings for creating consumer groups using event hubs management client.
settings, err := azauth.NewEnvironmentSettings("azure", metadata.Properties)
if err != nil {
return err
}
aeh.managementSettings = settings
}
}
// connect to the storage account.
if m.StorageAccountKey != "" {
metadata.Properties["accountKey"] = m.StorageAccountKey
}
var storageCredsErr error
aeh.storageCredential, aeh.azureEnvironment, storageCredsErr = azauth.GetAzureStorageBlobCredentials(aeh.logger, m.StorageAccountName, metadata.Properties)
if storageCredsErr != nil {
return fmt.Errorf("invalid storage credentials with error: %w", storageCredsErr)
}
// Default retry configuration is used if no backOff properties are set.
if err := retry.DecodeConfigWithPrefix(
&aeh.backOffConfig,
metadata.Properties,
"backOff"); err != nil {
return err
}
return nil
}
// Publish sends data to Azure Event Hubs.
func (aeh *AzureEventHubs) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil {
return fmt.Errorf("error on establishing hub connection: %s", err)
}
}
event := &eventhub.Event{Data: req.Data}
val, ok := req.Metadata[partitionKeyMetadataKey]
if ok {
event.PartitionKey = &val
}
err := aeh.hubClients[req.Topic].Send(ctx, event)
if err != nil {
return fmt.Errorf("error from publish: %s", err)
}
return nil
}
// BulkPublish sends data to Azure Event Hubs in bulk.
func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil {
err = fmt.Errorf("error on establishing hub connection: %s", err)
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
}
// Create a slice of events to send.
events := make([]*eventhub.Event, len(req.Entries))
for i, entry := range req.Entries {
events[i] = &eventhub.Event{Data: entry.Event}
if val, ok := entry.Metadata[partitionKeyMetadataKey]; ok {
events[i].PartitionKey = &val
}
}
// Configure options for sending events.
opts := []eventhub.BatchOption{
eventhub.BatchWithMaxSizeInBytes(utils.GetElemOrDefaultFromMap(
req.Metadata, contribMetadata.MaxBulkPubBytesKey, int(eventhub.DefaultMaxMessageSizeInBytes))),
}
// Send events.
err := aeh.hubClients[req.Topic].SendBatch(ctx, eventhub.NewEventBatchIterator(events...), opts...)
if err != nil {
// Partial success is not supported by Azure Event Hubs.
// If an error occurs, all events are considered failed.
return pubsub.NewBulkPublishResponse(req.Entries, err), err
}
return pubsub.BulkPublishResponse{}, nil
}
// Subscribe receives data from Azure Event Hubs.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := aeh.validateSubscriptionAttributes()
if err != nil {
return fmt.Errorf("error : error on subscribe %s", err)
}
if aeh.metadata.EnableEntityManagement {
if err = aeh.ensureSubscription(subscribeCtx, req.Topic); err != nil {
return err
}
}
// Set topic name, consumerID prefix for partition checkpoint lease blob path.
// This is needed to support multiple consumers for the topic using the same storage container.
leaserPrefixOpt := storage.WithPrefixInBlobPath(aeh.getStoragePrefixString(req.Topic))
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(aeh.storageCredential, aeh.metadata.StorageAccountName, aeh.metadata.StorageContainerName, *aeh.azureEnvironment, leaserPrefixOpt)
if err != nil {
return err
}
processor, err := aeh.ensureSubscriberClient(subscribeCtx, req.Topic, leaserCheckpointer)
if err != nil {
return err
}
getAllProperties := false
if req.Metadata[requireAllProperties] != "" {
getAllProperties, err = strconv.ParseBool(req.Metadata[requireAllProperties])
if err != nil {
aeh.logger.Errorf("invalid value for metadata : %s . Error: %v.", requireAllProperties, err)
}
}
aeh.logger.Debugf("registering handler for topic %s", req.Topic)
_, err = processor.RegisterHandler(subscribeCtx,
func(_ context.Context, e *eventhub.Event) error {
// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx)
retryerr := retry.NotifyRecover(func() error {
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)
return subscribeHandler(subscribeCtx, req.Topic, getAllProperties, e, handler)
}, b, func(_ error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID)
}, func() {
aeh.logger.Warnf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID)
})
if retryerr != nil {
aeh.logger.Errorf("Too many failed attempts at processing Eventhubs event: %s/%s. Error: %v.", req.Topic, e.ID, err)
}
return retryerr
})
if err != nil {
return err
}
err = processor.StartNonBlocking(subscribeCtx)
if err != nil {
return err
}
aeh.eventProcessors[req.Topic] = processor
// Listen for context cancelation and stop processing messages
// This seems to be necessary because otherwise the processor isn't automatically closed on context cancelation
go func() {
<-subscribeCtx.Done()
stopCtx, stopCancel := context.WithTimeout(context.Background(), resourceGetTimeout)
stopErr := processor.Close(stopCtx)
stopCancel()
if stopErr != nil {
aeh.logger.Warnf("Error closing subscribe processor: %v", stopErr)
}
}()
return nil
}
func (aeh *AzureEventHubs) Close() (err error) {
flag := false
var ctx context.Context
var cancel context.CancelFunc
for topic, client := range aeh.hubClients {
ctx, cancel = context.WithTimeout(context.Background(), resourceGetTimeout)
err = client.Close(ctx)
cancel()
if err != nil {
flag = true
aeh.logger.Warnf("error closing publish client properly for topic/eventHub %s: %s", topic, err)
}
}
aeh.hubClients = map[string]*eventhub.Hub{}
for topic, client := range aeh.eventProcessors {
ctx, cancel = context.WithTimeout(context.Background(), resourceGetTimeout)
err = client.Close(ctx)
cancel()
if err != nil {
flag = true
aeh.logger.Warnf("error closing event processor host client properly for topic/eventHub %s: %s", topic, err)
}
}
aeh.eventProcessors = map[string]*eph.EventProcessorHost{}
if flag {
return errors.New("error closing event hub clients in a proper fashion")
}
return nil
}
func (aeh *AzureEventHubs) Features() []pubsub.Feature {
return nil
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs
import (
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
type azureEventHubsMetadata struct {
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`
EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"`
ConsumerGroup string `json:"consumerID" mapstructure:"consumerID"`
StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"`
StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"`
StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"`
EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"`
MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"`
PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"`
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
// Internal properties
hubName string
aadTokenProvider azcore.TokenCredential
}
func parseEventHubsMetadata(meta pubsub.Metadata, log logger.Logger) (*azureEventHubsMetadata, error) {
var m azureEventHubsMetadata
err := metadata.DecodeMetadata(meta.Properties, &m)
if err != nil {
return nil, fmt.Errorf("failed to decode metada: %w", err)
}
if m.ConnectionString == "" && m.EventHubNamespace == "" {
return nil, errors.New("one of connectionString or eventHubNamespace is required")
}
if m.ConnectionString != "" && m.EventHubNamespace != "" {
return nil, errors.New("only one of connectionString or eventHubNamespace should be passed")
}
if m.EnableEntityManagement && m.ConnectionString != "" {
m.EnableEntityManagement = false
log.Warn("entity management support is not available when connecting with a connection string")
}
return &m, nil
}

View File

@ -19,7 +19,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -44,8 +44,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1 h1:YvQv9Mz6T8oR5ypQOL6erY0Z5t71ak1uHV4QFokCOZk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=

View File

@ -22,7 +22,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -48,8 +48,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfR
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=

View File

@ -22,7 +22,7 @@ require (
github.com/Azure/azure-sdk-for-go v67.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-amqp v0.18.0 // indirect

View File

@ -48,8 +48,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=

View File

@ -19,7 +19,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect

View File

@ -44,8 +44,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 h1:27HVgIcvrKkRs5eJzHnyZdt71/EyB3clkiJQB0qyIa8=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3/go.mod h1:Eo6WMP/iw9sp06+v8y030eReUwX6sULn5i3fxCDWPag=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=

View File

@ -19,7 +19,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -44,8 +44,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=

View File

@ -17,23 +17,19 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/Azure/azure-amqp-common-go/v4 v4.0.0 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.4.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v67.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-amqp v0.18.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.28 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.21 // indirect
github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.7.0 // indirect
@ -47,7 +43,6 @@ require (
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/devigned/tab v0.1.1 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
@ -87,7 +82,6 @@ require (
github.com/imdario/mergo v0.3.12 // indirect
github.com/jhump/protoreflect v1.13.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect

View File

@ -37,25 +37,23 @@ github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a h1:
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a/go.mod h1:C0A1KeiVHs+trY6gUTPhhGammbrZ30ZfXRW/nuT7HLw=
github.com/Azure/azure-amqp-common-go/v4 v4.0.0 h1:mV5O74KYmonn0ZXtwfMjGUtZ9Z+Hv7AIFVS1s03sRvo=
github.com/Azure/azure-amqp-common-go/v4 v4.0.0/go.mod h1:4+qRvizIo4+CbGG552O6a8ONkEwRgWXqes3SUt1Ftrc=
github.com/Azure/azure-event-hubs-go/v3 v3.4.0 h1:LtH0nHkXivyV/GajOu5ZFC5sb/5KZ8j+9U8UsfHVTOo=
github.com/Azure/azure-event-hubs-go/v3 v3.4.0/go.mod h1:ODgt5C1/c73FToYj+mWojUJLXF877ALc6G4XnfRFlAY=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v67.0.0+incompatible h1:SVBwznSETB0Sipd0uyGJr7khLhJOFRUEUb+0JgkCvDo=
github.com/Azure/azure-sdk-for-go v67.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUXTtj12tQ6zE2GZUgVQw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0 h1:X/ePaAG8guM7j5WORT5eEIw7cGUxe9Ah1jEQJKLmmSo=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0/go.mod h1:5dog28UP3dd1BnCPFYvyHfsmA+Phmoezt+KWT5cZnyc=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 h1:BWeAAEzkCnL0ABVJqs+4mYudNch7oFGPtTlSmIWL8ms=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
github.com/Azure/go-amqp v0.18.0 h1:95bTiJq0oxjK1RUlt5T3HF/THj6jWTRZpSXMPSOJLz8=
github.com/Azure/go-amqp v0.18.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc=
@ -74,10 +72,6 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/mocks v0.4.2 h1:PGN4EDXnuQbojHbU0UWoNvmu9AGVwYHG9/fkDYhtAfw=
github.com/Azure/go-autorest/autorest/mocks v0.4.2/go.mod h1:Vy7OitM9Kei0i1Oj+LvyAWMXJHeKH1MVlzFugfVrmyU=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
@ -146,8 +140,6 @@ github.com/dapr/kit v0.0.3/go.mod h1:+vh2UIRT0KzFm5YJWfj7az4XVSdodys1OCz1WzNe1Eo
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U=
github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
@ -173,7 +165,6 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
@ -353,10 +344,9 @@ github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
github.com/jhump/protoreflect v1.13.0 h1:zrrZqa7JAc2YGgPSzZZkmUXJ5G6NRPdxOg/9t7ISImA=
github.com/jhump/protoreflect v1.13.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=

View File

@ -20,7 +20,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect

View File

@ -44,8 +44,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 h1:27HVgIcvrKkRs5eJzHnyZdt71/EyB3clkiJQB0qyIa8=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3/go.mod h1:Eo6WMP/iw9sp06+v8y030eReUwX6sULn5i3fxCDWPag=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=

View File

@ -18,7 +18,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect

View File

@ -44,8 +44,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0 h1:82w8tzLcOwDP/Q35j/wEBPt0n0kVC3cjtPdD62G8UAk=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0/go.mod h1:S78i9yTr4o/nXlH76bKjGUye9Z2wSxO5Tz7GoDr4vfI=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0 h1:Lg6BW0VPmCwcMlvOviL3ruHFO+H9tZNqscK0AeuFjGM=

View File

@ -18,7 +18,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect

View File

@ -44,8 +44,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUX
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1 h1:YvQv9Mz6T8oR5ypQOL6erY0Z5t71ak1uHV4QFokCOZk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=

View File

@ -20,7 +20,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -48,8 +48,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfR
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=

View File

@ -19,7 +19,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -46,8 +46,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfR
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=