From dc7ee55e2ab7006f9d216f677f3a05c25d7e03fd Mon Sep 17 00:00:00 2001 From: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com> Date: Mon, 2 Aug 2021 18:03:24 -0700 Subject: [PATCH] Add Event Hubs pubsub and bindings conformance tests (#1040) * Add conformance test for EventHubs bindings * Update EventHubs pubsub to pass conformance tests - Add conformance tests for Azure EventHubs PubSub component - Add retry & backoff handling on subscriber handling error to EventHubs PubSub component for conformance tests. - Add cancellation context to Azure EventHubs and update Close() to invoke cancel prior to closing the hub, which cleans up both senders and receivers. --- .../azure/conf-test-azure-eventhubs.bicep | 40 +++++++++++++++++ .../conformance/azure/conf-test-azure.bicep | 14 ++++++ .../azure/setup-azure-conf-test.sh | 22 ++++++++++ pubsub/azure/eventhubs/eventhubs.go | 43 +++++++++++++++---- .../bindings/azure/eventhubs/bindings.yml | 20 +++++++++ tests/config/bindings/tests.yml | 2 + .../config/pubsub/azure/eventhubs/pubsub.yml | 20 +++++++++ tests/config/pubsub/tests.yml | 4 ++ tests/conformance/common.go | 9 ++++ 9 files changed, 165 insertions(+), 9 deletions(-) create mode 100644 .github/infrastructure/conformance/azure/conf-test-azure-eventhubs.bicep create mode 100644 tests/config/bindings/azure/eventhubs/bindings.yml create mode 100644 tests/config/pubsub/azure/eventhubs/pubsub.yml diff --git a/.github/infrastructure/conformance/azure/conf-test-azure-eventhubs.bicep b/.github/infrastructure/conformance/azure/conf-test-azure-eventhubs.bicep new file mode 100644 index 000000000..6e3630228 --- /dev/null +++ b/.github/infrastructure/conformance/azure/conf-test-azure-eventhubs.bicep @@ -0,0 +1,40 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +param eventHubsNamespaceName string +param rgLocation string = resourceGroup().location +param confTestTags object = {} + +var eventHubName = '${eventHubsNamespaceName}-topic' +var eventHubPolicyName = '${eventHubName}-policy' +var eventHubConsumerGroupName = '${eventHubName}-cg' + +resource eventHubsNamespace 'Microsoft.EventHub/namespaces@2017-04-01' = { + name: eventHubsNamespaceName + location: rgLocation + tags: confTestTags + sku: { + name: 'Standard' // For > 1 consumer group + } + resource eventHub 'eventhubs' = { + name: eventHubName + resource eventHubPolicy 'authorizationRules' = { + name: eventHubPolicyName + properties: { + rights: [ + 'Send' + 'Listen' + ] + } + } + resource consumerGroup 'consumergroups' = { + name: eventHubConsumerGroupName + } + } +} + +output eventHubName string = eventHubsNamespace::eventHub.name +output eventHubPolicyName string = eventHubsNamespace::eventHub::eventHubPolicy.name +output eventHubConsumerGroupName string = eventHubsNamespace::eventHub::consumerGroup.name diff --git a/.github/infrastructure/conformance/azure/conf-test-azure.bicep b/.github/infrastructure/conformance/azure/conf-test-azure.bicep index 435e2c086..98efa239e 100644 --- a/.github/infrastructure/conformance/azure/conf-test-azure.bicep +++ b/.github/infrastructure/conformance/azure/conf-test-azure.bicep @@ -36,6 +36,7 @@ param certAuthSpId string var confTestRgName = '${toLower(namePrefix)}-conf-test-rg' var cosmosDbName = '${toLower(namePrefix)}-conf-test-db' var eventGridTopicName = '${toLower(namePrefix)}-conf-test-eventgrid-topic' +var eventHubsNamespaceName = '${toLower(namePrefix)}-conf-test-eventhubs' var keyVaultName = '${toLower(namePrefix)}-conf-test-kv' var serviceBusName = '${toLower(namePrefix)}-conf-test-servicebus' var storageName = '${toLower(namePrefix)}ctstorage' @@ -64,6 +65,15 @@ module eventGridTopic 'conf-test-azure-eventGrid.bicep' = { } } +module eventHubsNamespace 'conf-test-azure-eventHubs.bicep' = { + name: eventHubsNamespaceName + scope: resourceGroup(confTestRg.name) + params: { + confTestTags: confTestTags + eventHubsNamespaceName: eventHubsNamespaceName + } +} + module keyVault 'conf-test-azure-keyVault.bicep' = { name: keyVaultName scope: resourceGroup(confTestRg.name) @@ -99,6 +109,10 @@ output cosmosDbName string = cosmosDb.name output cosmosDbSqlName string = cosmosDb.outputs.cosmosDbSqlName output cosmosDbSqlContainerName string = cosmosDb.outputs.cosmosDbSqlContainerName output eventGridTopicName string = eventGridTopic.name +output eventHubsNamespace string = eventHubsNamespace.name +output eventHubName string = eventHubsNamespace.outputs.eventHubName +output eventHubPolicyName string = eventHubsNamespace.outputs.eventHubPolicyName +output eventHubConsumerGroupName string = eventHubsNamespace.outputs.eventHubConsumerGroupName output keyVaultName string = keyVault.name output serviceBusName string = serviceBus.name output storageName string = storage.name diff --git a/.github/infrastructure/conformance/azure/setup-azure-conf-test.sh b/.github/infrastructure/conformance/azure/setup-azure-conf-test.sh index f253f6fda..a0d62c991 100644 --- a/.github/infrastructure/conformance/azure/setup-azure-conf-test.sh +++ b/.github/infrastructure/conformance/azure/setup-azure-conf-test.sh @@ -151,6 +151,9 @@ EVENT_GRID_SUB_ID_VAR_NAME="AzureEventGridSubscriptionId" EVENT_GRID_TENANT_ID_VAR_NAME="AzureEventGridTenantId" EVENT_GRID_TOPIC_ENDPOINT_VAR_NAME="AzureEventGridTopicEndpoint" +EVENT_HUBS_CONNECTION_STRING_VAR_NAME="AzureEventHubsConnectionString" +EVENT_HUBS_CONSUMER_GROUP_VAR_NAME="AzureEventHubsConsumerGroup" + KEYVAULT_CERT_NAME="AzureKeyVaultSecretStoreCert" KEYVAULT_CLIENT_ID_VAR_NAME="AzureKeyVaultSecretStoreClientId" KEYVAULT_TENANT_ID_VAR_NAME="AzureKeyVaultSecretStoreTenantId" @@ -214,6 +217,14 @@ COSMOS_DB_CONTAINER_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --que echo "INFO: COSMOS_DB_CONTAINER_NAME=${COSMOS_DB_CONTAINER_NAME}" EVENT_GRID_TOPIC_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventGridTopicName.value" | sed -E 's/[[:space:]]|\"//g')" echo "INFO: EVENT_GRID_TOPIC_NAME=${EVENT_GRID_TOPIC_NAME}" +EVENT_HUBS_NAMESPACE="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubsNamespace.value" | sed -E 's/[[:space:]]|\"//g')" +echo "INFO: EVENT_HUBS_NAMESPACE=${EVENT_HUBS_NAMESPACE}" +EVENT_HUB_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubName.value" | sed -E 's/[[:space:]]|\"//g')" +echo "INFO: EVENT_HUB_NAME=${EVENT_HUB_NAME}" +EVENT_HUB_POLICY_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubPolicyName.value" | sed -E 's/[[:space:]]|\"//g')" +echo "INFO: EVENT_HUB_POLICY_NAME=${EVENT_HUB_POLICY_NAME}" +EVENT_HUBS_CONSUMER_GROUP_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubConsumerGroupName.value" | sed -E 's/[[:space:]]|\"//g')" +echo "INFO: EVENT_HUBS_CONSUMER_GROUP_NAME=${EVENT_HUBS_CONSUMER_GROUP_NAME}" # Update service principal credentials and roles for created resources echo "Creating ${CERT_AUTH_SP_NAME} certificate ..." @@ -389,6 +400,17 @@ SERVICE_BUS_CONNECTION_STRING="$(az servicebus namespace authorization-rule keys echo export ${SERVICE_BUS_CONNECTION_STRING_VAR_NAME}=\"${SERVICE_BUS_CONNECTION_STRING}\" >> "${ENV_CONFIG_FILENAME}" az keyvault secret set --name "${SERVICE_BUS_CONNECTION_STRING_VAR_NAME}" --vault-name "${KEYVAULT_NAME}" --value "${SERVICE_BUS_CONNECTION_STRING}" +# ---------------------------------- +# Populate Event Hubs test settings +# ---------------------------------- +echo "Configuring Event Hub test settings ..." +EVENT_HUBS_CONNECTION_STRING="$(az eventhubs eventhub authorization-rule keys list --name "${EVENT_HUB_POLICY_NAME}" --namespace-name "${EVENT_HUBS_NAMESPACE}" --eventhub-name "${EVENT_HUB_NAME}" --resource-group "${RESOURCE_GROUP_NAME}" --query "primaryConnectionString" | sed -E 's/[[:space:]]|\"//g')" +echo export ${EVENT_HUBS_CONNECTION_STRING_VAR_NAME}=\"${EVENT_HUBS_CONNECTION_STRING}\" >> "${ENV_CONFIG_FILENAME}" +az keyvault secret set --name "${EVENT_HUBS_CONNECTION_STRING_VAR_NAME}" --vault-name "${KEYVAULT_NAME}" --value "${EVENT_HUBS_CONNECTION_STRING}" + +echo export ${EVENT_HUBS_CONSUMER_GROUP_VAR_NAME}=\"${EVENT_HUBS_CONSUMER_GROUP_NAME}\" >> "${ENV_CONFIG_FILENAME}" +az keyvault secret set --name "${EVENT_HUBS_CONSUMER_GROUP_VAR_NAME}" --vault-name "${KEYVAULT_NAME}" --value "${EVENT_HUBS_CONSUMER_GROUP_NAME}" + echo "INFO: setup-azure-conf-test completed." echo "INFO: Remember to \`source ${ENV_CONFIG_FILENAME}\` before running local conformance tests." echo "INFO: ${AZURE_CREDENTIALS_FILENAME} contains the repository secret to set to run the GitHub conformance test workflow." diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index ac0a6898d..81d117b99 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -19,6 +19,7 @@ import ( "github.com/Azure/go-autorest/autorest/azure" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" + "github.com/dapr/kit/retry" ) const ( @@ -51,7 +52,7 @@ const ( sysPropIotHubEnqueuedTime = "iothub-enqueuedtime" ) -func subscribeHandler(topic string, e *eventhub.Event, handler pubsub.Handler) error { +func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, handler pubsub.Handler) error { res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}} if e.SystemProperties.SequenceNumber != nil { res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10) @@ -86,7 +87,7 @@ func subscribeHandler(topic string, e *eventhub.Event, handler pubsub.Handler) e res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339) } - return handler(context.Background(), &res) + return handler(ctx, &res) } // AzureEventHubs allows sending/receiving Azure Event Hubs events @@ -94,7 +95,10 @@ type AzureEventHubs struct { hub *eventhub.Hub metadata azureEventHubsMetadata - logger logger.Logger + logger logger.Logger + ctx context.Context + cancel context.CancelFunc + backOffConfig retry.Config } type azureEventHubsMetadata struct { @@ -159,13 +163,22 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error { } aeh.hub = hub + aeh.ctx, aeh.cancel = context.WithCancel(context.Background()) + + // Default retry configuration is used if no backOff properties are set. + if err := retry.DecodeConfigWithPrefix( + &aeh.backOffConfig, + metadata.Properties, + "backOff"); err != nil { + return err + } return nil } // Publish sends data to Azure Event Hubs func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error { - err := aeh.hub.Send(context.Background(), &eventhub.Event{Data: req.Data}) + err := aeh.hub.Send(aeh.ctx, &eventhub.Event{Data: req.Data}) if err != nil { return fmt.Errorf("error from publish: %s", err) } @@ -185,20 +198,30 @@ func (aeh *AzureEventHubs) Subscribe(req pubsub.SubscribeRequest, handler pubsub return err } - processor, err := eph.NewFromConnectionString(context.Background(), aeh.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.consumerGroup)) + processor, err := eph.NewFromConnectionString(aeh.ctx, aeh.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.consumerGroup)) if err != nil { return err } - _, err = processor.RegisterHandler(context.Background(), + _, err = processor.RegisterHandler(aeh.ctx, func(c context.Context, e *eventhub.Event) error { - return subscribeHandler(req.Topic, e, handler) + b := aeh.backOffConfig.NewBackOffWithContext(aeh.ctx) + + return retry.NotifyRecover(func() error { + aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID) + + return subscribeHandler(aeh.ctx, req.Topic, e, handler) + }, b, func(err error, d time.Duration) { + aeh.logger.Errorf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID) + }, func() { + aeh.logger.Errorf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID) + }) }) if err != nil { return err } - err = processor.StartNonBlocking(context.Background()) + err = processor.StartNonBlocking(aeh.ctx) if err != nil { return err } @@ -207,7 +230,9 @@ func (aeh *AzureEventHubs) Subscribe(req pubsub.SubscribeRequest, handler pubsub } func (aeh *AzureEventHubs) Close() error { - return aeh.hub.Close(context.TODO()) + aeh.cancel() + + return aeh.hub.Close(aeh.ctx) } func (aeh *AzureEventHubs) Features() []pubsub.Feature { diff --git a/tests/config/bindings/azure/eventhubs/bindings.yml b/tests/config/bindings/azure/eventhubs/bindings.yml new file mode 100644 index 000000000..f00aa36c7 --- /dev/null +++ b/tests/config/bindings/azure/eventhubs/bindings.yml @@ -0,0 +1,20 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: azure-eventhubs + namespace: default +spec: + type: bindings.azure.eventhubs + version: v1 + metadata: + - name: connectionString + value: ${{AzureEventHubsConnectionString}} + - name: consumerGroup + value: ${{AzureEventHubsConsumerGroup}} + # Reuse the blob storage account from the storage bindings conformance test + - name: storageAccountName + value: ${{AzureBlobStorageAccount}} + - name: storageAccountKey + value: ${{AzureBlobStorageAccessKey}} + - name: storageContainerName + value: ${{AzureBlobStorageContainer}} diff --git a/tests/config/bindings/tests.yml b/tests/config/bindings/tests.yml index 7b73f79db..04d041a44 100644 --- a/tests/config/bindings/tests.yml +++ b/tests/config/bindings/tests.yml @@ -16,6 +16,8 @@ components: config: output: blobName: $((uuid)) + - component: azure.eventhubs + operations: ["create", "operations", "read"] - component: azure.eventgrid operations: ["create", "operations", "read"] config: diff --git a/tests/config/pubsub/azure/eventhubs/pubsub.yml b/tests/config/pubsub/azure/eventhubs/pubsub.yml new file mode 100644 index 000000000..b5cfaac99 --- /dev/null +++ b/tests/config/pubsub/azure/eventhubs/pubsub.yml @@ -0,0 +1,20 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: azure-eventhubs + namespace: default +spec: + type: pubsub.azure.eventhubs + version: v1 + metadata: + - name: connectionString + value: ${{AzureEventHubsConnectionString}} + - name: consumerID + value: ${{AzureEventHubsConsumerGroup}} + # Reuse the blob storage account from the storage bindings conformance test + - name: storageAccountName + value: ${{AzureBlobStorageAccount}} + - name: storageAccountKey + value: ${{AzureBlobStorageAccessKey}} + - name: storageContainerName + value: ${{AzureBlobStorageContainer}} diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 48c070105..6e2158fc1 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -9,6 +9,10 @@ ## checkInOrderProcessing: false disables in-order message processing checking componentType: pubsub components: + - component: azure.eventhubs + allOperations: true + config: + checkInOrderProcessing: false - component: azure.servicebus allOperations: true config: diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 5cee82fc3..0ecd132ae 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -28,12 +28,14 @@ import ( b_azure_blobstorage "github.com/dapr/components-contrib/bindings/azure/blobstorage" b_azure_eventgrid "github.com/dapr/components-contrib/bindings/azure/eventgrid" + b_azure_eventhubs "github.com/dapr/components-contrib/bindings/azure/eventhubs" b_azure_servicebusqueues "github.com/dapr/components-contrib/bindings/azure/servicebusqueues" b_azure_storagequeues "github.com/dapr/components-contrib/bindings/azure/storagequeues" b_http "github.com/dapr/components-contrib/bindings/http" b_kafka "github.com/dapr/components-contrib/bindings/kafka" b_mqtt "github.com/dapr/components-contrib/bindings/mqtt" b_redis "github.com/dapr/components-contrib/bindings/redis" + p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" p_servicebus "github.com/dapr/components-contrib/pubsub/azure/servicebus" p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast" p_kafka "github.com/dapr/components-contrib/pubsub/kafka" @@ -56,6 +58,7 @@ import ( ) const ( + eventhubs = "azure.eventhubs" redis = "redis" kafka = "kafka" mqtt = "mqtt" @@ -310,6 +313,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { switch tc.Component { case redis: pubsub = p_redis.NewRedisStreams(testLogger) + case eventhubs: + pubsub = p_eventhubs.NewAzureEventHubs(testLogger) case "azure.servicebus": pubsub = p_servicebus.NewAzureServiceBus(testLogger) case "natsstreaming": @@ -379,6 +384,8 @@ func loadOutputBindings(tc TestComponent) bindings.OutputBinding { binding = b_azure_servicebusqueues.NewAzureServiceBusQueues(testLogger) case "azure.eventgrid": binding = b_azure_eventgrid.NewAzureEventGrid(testLogger) + case eventhubs: + binding = b_azure_eventhubs.NewAzureEventHubs(testLogger) case kafka: binding = b_kafka.NewKafka(testLogger) case "http": @@ -402,6 +409,8 @@ func loadInputBindings(tc TestComponent) bindings.InputBinding { binding = b_azure_storagequeues.NewAzureStorageQueues(testLogger) case "azure.eventgrid": binding = b_azure_eventgrid.NewAzureEventGrid(testLogger) + case eventhubs: + binding = b_azure_eventhubs.NewAzureEventHubs(testLogger) case kafka: binding = b_kafka.NewKafka(testLogger) case mqtt: