Add Event Hubs pubsub and bindings conformance tests (#1040)
* Add conformance test for EventHubs bindings * Update EventHubs pubsub to pass conformance tests - Add conformance tests for Azure EventHubs PubSub component - Add retry & backoff handling on subscriber handling error to EventHubs PubSub component for conformance tests. - Add cancellation context to Azure EventHubs and update Close() to invoke cancel prior to closing the hub, which cleans up both senders and receivers.
This commit is contained in:
parent
be88343a5b
commit
dc7ee55e2a
|
@ -0,0 +1,40 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation and Dapr Contributors.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
param eventHubsNamespaceName string
|
||||
param rgLocation string = resourceGroup().location
|
||||
param confTestTags object = {}
|
||||
|
||||
var eventHubName = '${eventHubsNamespaceName}-topic'
|
||||
var eventHubPolicyName = '${eventHubName}-policy'
|
||||
var eventHubConsumerGroupName = '${eventHubName}-cg'
|
||||
|
||||
resource eventHubsNamespace 'Microsoft.EventHub/namespaces@2017-04-01' = {
|
||||
name: eventHubsNamespaceName
|
||||
location: rgLocation
|
||||
tags: confTestTags
|
||||
sku: {
|
||||
name: 'Standard' // For > 1 consumer group
|
||||
}
|
||||
resource eventHub 'eventhubs' = {
|
||||
name: eventHubName
|
||||
resource eventHubPolicy 'authorizationRules' = {
|
||||
name: eventHubPolicyName
|
||||
properties: {
|
||||
rights: [
|
||||
'Send'
|
||||
'Listen'
|
||||
]
|
||||
}
|
||||
}
|
||||
resource consumerGroup 'consumergroups' = {
|
||||
name: eventHubConsumerGroupName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
output eventHubName string = eventHubsNamespace::eventHub.name
|
||||
output eventHubPolicyName string = eventHubsNamespace::eventHub::eventHubPolicy.name
|
||||
output eventHubConsumerGroupName string = eventHubsNamespace::eventHub::consumerGroup.name
|
|
@ -36,6 +36,7 @@ param certAuthSpId string
|
|||
var confTestRgName = '${toLower(namePrefix)}-conf-test-rg'
|
||||
var cosmosDbName = '${toLower(namePrefix)}-conf-test-db'
|
||||
var eventGridTopicName = '${toLower(namePrefix)}-conf-test-eventgrid-topic'
|
||||
var eventHubsNamespaceName = '${toLower(namePrefix)}-conf-test-eventhubs'
|
||||
var keyVaultName = '${toLower(namePrefix)}-conf-test-kv'
|
||||
var serviceBusName = '${toLower(namePrefix)}-conf-test-servicebus'
|
||||
var storageName = '${toLower(namePrefix)}ctstorage'
|
||||
|
@ -64,6 +65,15 @@ module eventGridTopic 'conf-test-azure-eventGrid.bicep' = {
|
|||
}
|
||||
}
|
||||
|
||||
module eventHubsNamespace 'conf-test-azure-eventHubs.bicep' = {
|
||||
name: eventHubsNamespaceName
|
||||
scope: resourceGroup(confTestRg.name)
|
||||
params: {
|
||||
confTestTags: confTestTags
|
||||
eventHubsNamespaceName: eventHubsNamespaceName
|
||||
}
|
||||
}
|
||||
|
||||
module keyVault 'conf-test-azure-keyVault.bicep' = {
|
||||
name: keyVaultName
|
||||
scope: resourceGroup(confTestRg.name)
|
||||
|
@ -99,6 +109,10 @@ output cosmosDbName string = cosmosDb.name
|
|||
output cosmosDbSqlName string = cosmosDb.outputs.cosmosDbSqlName
|
||||
output cosmosDbSqlContainerName string = cosmosDb.outputs.cosmosDbSqlContainerName
|
||||
output eventGridTopicName string = eventGridTopic.name
|
||||
output eventHubsNamespace string = eventHubsNamespace.name
|
||||
output eventHubName string = eventHubsNamespace.outputs.eventHubName
|
||||
output eventHubPolicyName string = eventHubsNamespace.outputs.eventHubPolicyName
|
||||
output eventHubConsumerGroupName string = eventHubsNamespace.outputs.eventHubConsumerGroupName
|
||||
output keyVaultName string = keyVault.name
|
||||
output serviceBusName string = serviceBus.name
|
||||
output storageName string = storage.name
|
||||
|
|
|
@ -151,6 +151,9 @@ EVENT_GRID_SUB_ID_VAR_NAME="AzureEventGridSubscriptionId"
|
|||
EVENT_GRID_TENANT_ID_VAR_NAME="AzureEventGridTenantId"
|
||||
EVENT_GRID_TOPIC_ENDPOINT_VAR_NAME="AzureEventGridTopicEndpoint"
|
||||
|
||||
EVENT_HUBS_CONNECTION_STRING_VAR_NAME="AzureEventHubsConnectionString"
|
||||
EVENT_HUBS_CONSUMER_GROUP_VAR_NAME="AzureEventHubsConsumerGroup"
|
||||
|
||||
KEYVAULT_CERT_NAME="AzureKeyVaultSecretStoreCert"
|
||||
KEYVAULT_CLIENT_ID_VAR_NAME="AzureKeyVaultSecretStoreClientId"
|
||||
KEYVAULT_TENANT_ID_VAR_NAME="AzureKeyVaultSecretStoreTenantId"
|
||||
|
@ -214,6 +217,14 @@ COSMOS_DB_CONTAINER_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --que
|
|||
echo "INFO: COSMOS_DB_CONTAINER_NAME=${COSMOS_DB_CONTAINER_NAME}"
|
||||
EVENT_GRID_TOPIC_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventGridTopicName.value" | sed -E 's/[[:space:]]|\"//g')"
|
||||
echo "INFO: EVENT_GRID_TOPIC_NAME=${EVENT_GRID_TOPIC_NAME}"
|
||||
EVENT_HUBS_NAMESPACE="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubsNamespace.value" | sed -E 's/[[:space:]]|\"//g')"
|
||||
echo "INFO: EVENT_HUBS_NAMESPACE=${EVENT_HUBS_NAMESPACE}"
|
||||
EVENT_HUB_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubName.value" | sed -E 's/[[:space:]]|\"//g')"
|
||||
echo "INFO: EVENT_HUB_NAME=${EVENT_HUB_NAME}"
|
||||
EVENT_HUB_POLICY_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubPolicyName.value" | sed -E 's/[[:space:]]|\"//g')"
|
||||
echo "INFO: EVENT_HUB_POLICY_NAME=${EVENT_HUB_POLICY_NAME}"
|
||||
EVENT_HUBS_CONSUMER_GROUP_NAME="$(az deployment sub show --name "${DEPLOY_NAME}" --query "properties.outputs.eventHubConsumerGroupName.value" | sed -E 's/[[:space:]]|\"//g')"
|
||||
echo "INFO: EVENT_HUBS_CONSUMER_GROUP_NAME=${EVENT_HUBS_CONSUMER_GROUP_NAME}"
|
||||
|
||||
# Update service principal credentials and roles for created resources
|
||||
echo "Creating ${CERT_AUTH_SP_NAME} certificate ..."
|
||||
|
@ -389,6 +400,17 @@ SERVICE_BUS_CONNECTION_STRING="$(az servicebus namespace authorization-rule keys
|
|||
echo export ${SERVICE_BUS_CONNECTION_STRING_VAR_NAME}=\"${SERVICE_BUS_CONNECTION_STRING}\" >> "${ENV_CONFIG_FILENAME}"
|
||||
az keyvault secret set --name "${SERVICE_BUS_CONNECTION_STRING_VAR_NAME}" --vault-name "${KEYVAULT_NAME}" --value "${SERVICE_BUS_CONNECTION_STRING}"
|
||||
|
||||
# ----------------------------------
|
||||
# Populate Event Hubs test settings
|
||||
# ----------------------------------
|
||||
echo "Configuring Event Hub test settings ..."
|
||||
EVENT_HUBS_CONNECTION_STRING="$(az eventhubs eventhub authorization-rule keys list --name "${EVENT_HUB_POLICY_NAME}" --namespace-name "${EVENT_HUBS_NAMESPACE}" --eventhub-name "${EVENT_HUB_NAME}" --resource-group "${RESOURCE_GROUP_NAME}" --query "primaryConnectionString" | sed -E 's/[[:space:]]|\"//g')"
|
||||
echo export ${EVENT_HUBS_CONNECTION_STRING_VAR_NAME}=\"${EVENT_HUBS_CONNECTION_STRING}\" >> "${ENV_CONFIG_FILENAME}"
|
||||
az keyvault secret set --name "${EVENT_HUBS_CONNECTION_STRING_VAR_NAME}" --vault-name "${KEYVAULT_NAME}" --value "${EVENT_HUBS_CONNECTION_STRING}"
|
||||
|
||||
echo export ${EVENT_HUBS_CONSUMER_GROUP_VAR_NAME}=\"${EVENT_HUBS_CONSUMER_GROUP_NAME}\" >> "${ENV_CONFIG_FILENAME}"
|
||||
az keyvault secret set --name "${EVENT_HUBS_CONSUMER_GROUP_VAR_NAME}" --vault-name "${KEYVAULT_NAME}" --value "${EVENT_HUBS_CONSUMER_GROUP_NAME}"
|
||||
|
||||
echo "INFO: setup-azure-conf-test completed."
|
||||
echo "INFO: Remember to \`source ${ENV_CONFIG_FILENAME}\` before running local conformance tests."
|
||||
echo "INFO: ${AZURE_CREDENTIALS_FILENAME} contains the repository secret to set to run the GitHub conformance test workflow."
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/dapr/kit/retry"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -51,7 +52,7 @@ const (
|
|||
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime"
|
||||
)
|
||||
|
||||
func subscribeHandler(topic string, e *eventhub.Event, handler pubsub.Handler) error {
|
||||
func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, handler pubsub.Handler) error {
|
||||
res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}}
|
||||
if e.SystemProperties.SequenceNumber != nil {
|
||||
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
|
||||
|
@ -86,7 +87,7 @@ func subscribeHandler(topic string, e *eventhub.Event, handler pubsub.Handler) e
|
|||
res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
return handler(context.Background(), &res)
|
||||
return handler(ctx, &res)
|
||||
}
|
||||
|
||||
// AzureEventHubs allows sending/receiving Azure Event Hubs events
|
||||
|
@ -94,7 +95,10 @@ type AzureEventHubs struct {
|
|||
hub *eventhub.Hub
|
||||
metadata azureEventHubsMetadata
|
||||
|
||||
logger logger.Logger
|
||||
logger logger.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backOffConfig retry.Config
|
||||
}
|
||||
|
||||
type azureEventHubsMetadata struct {
|
||||
|
@ -159,13 +163,22 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
|
|||
}
|
||||
|
||||
aeh.hub = hub
|
||||
aeh.ctx, aeh.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// Default retry configuration is used if no backOff properties are set.
|
||||
if err := retry.DecodeConfigWithPrefix(
|
||||
&aeh.backOffConfig,
|
||||
metadata.Properties,
|
||||
"backOff"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish sends data to Azure Event Hubs
|
||||
func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error {
|
||||
err := aeh.hub.Send(context.Background(), &eventhub.Event{Data: req.Data})
|
||||
err := aeh.hub.Send(aeh.ctx, &eventhub.Event{Data: req.Data})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error from publish: %s", err)
|
||||
}
|
||||
|
@ -185,20 +198,30 @@ func (aeh *AzureEventHubs) Subscribe(req pubsub.SubscribeRequest, handler pubsub
|
|||
return err
|
||||
}
|
||||
|
||||
processor, err := eph.NewFromConnectionString(context.Background(), aeh.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.consumerGroup))
|
||||
processor, err := eph.NewFromConnectionString(aeh.ctx, aeh.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.consumerGroup))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = processor.RegisterHandler(context.Background(),
|
||||
_, err = processor.RegisterHandler(aeh.ctx,
|
||||
func(c context.Context, e *eventhub.Event) error {
|
||||
return subscribeHandler(req.Topic, e, handler)
|
||||
b := aeh.backOffConfig.NewBackOffWithContext(aeh.ctx)
|
||||
|
||||
return retry.NotifyRecover(func() error {
|
||||
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)
|
||||
|
||||
return subscribeHandler(aeh.ctx, req.Topic, e, handler)
|
||||
}, b, func(err error, d time.Duration) {
|
||||
aeh.logger.Errorf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID)
|
||||
}, func() {
|
||||
aeh.logger.Errorf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID)
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = processor.StartNonBlocking(context.Background())
|
||||
err = processor.StartNonBlocking(aeh.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -207,7 +230,9 @@ func (aeh *AzureEventHubs) Subscribe(req pubsub.SubscribeRequest, handler pubsub
|
|||
}
|
||||
|
||||
func (aeh *AzureEventHubs) Close() error {
|
||||
return aeh.hub.Close(context.TODO())
|
||||
aeh.cancel()
|
||||
|
||||
return aeh.hub.Close(aeh.ctx)
|
||||
}
|
||||
|
||||
func (aeh *AzureEventHubs) Features() []pubsub.Feature {
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: azure-eventhubs
|
||||
namespace: default
|
||||
spec:
|
||||
type: bindings.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: connectionString
|
||||
value: ${{AzureEventHubsConnectionString}}
|
||||
- name: consumerGroup
|
||||
value: ${{AzureEventHubsConsumerGroup}}
|
||||
# Reuse the blob storage account from the storage bindings conformance test
|
||||
- name: storageAccountName
|
||||
value: ${{AzureBlobStorageAccount}}
|
||||
- name: storageAccountKey
|
||||
value: ${{AzureBlobStorageAccessKey}}
|
||||
- name: storageContainerName
|
||||
value: ${{AzureBlobStorageContainer}}
|
|
@ -16,6 +16,8 @@ components:
|
|||
config:
|
||||
output:
|
||||
blobName: $((uuid))
|
||||
- component: azure.eventhubs
|
||||
operations: ["create", "operations", "read"]
|
||||
- component: azure.eventgrid
|
||||
operations: ["create", "operations", "read"]
|
||||
config:
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: azure-eventhubs
|
||||
namespace: default
|
||||
spec:
|
||||
type: pubsub.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: connectionString
|
||||
value: ${{AzureEventHubsConnectionString}}
|
||||
- name: consumerID
|
||||
value: ${{AzureEventHubsConsumerGroup}}
|
||||
# Reuse the blob storage account from the storage bindings conformance test
|
||||
- name: storageAccountName
|
||||
value: ${{AzureBlobStorageAccount}}
|
||||
- name: storageAccountKey
|
||||
value: ${{AzureBlobStorageAccessKey}}
|
||||
- name: storageContainerName
|
||||
value: ${{AzureBlobStorageContainer}}
|
|
@ -9,6 +9,10 @@
|
|||
## checkInOrderProcessing: false disables in-order message processing checking
|
||||
componentType: pubsub
|
||||
components:
|
||||
- component: azure.eventhubs
|
||||
allOperations: true
|
||||
config:
|
||||
checkInOrderProcessing: false
|
||||
- component: azure.servicebus
|
||||
allOperations: true
|
||||
config:
|
||||
|
|
|
@ -28,12 +28,14 @@ import (
|
|||
|
||||
b_azure_blobstorage "github.com/dapr/components-contrib/bindings/azure/blobstorage"
|
||||
b_azure_eventgrid "github.com/dapr/components-contrib/bindings/azure/eventgrid"
|
||||
b_azure_eventhubs "github.com/dapr/components-contrib/bindings/azure/eventhubs"
|
||||
b_azure_servicebusqueues "github.com/dapr/components-contrib/bindings/azure/servicebusqueues"
|
||||
b_azure_storagequeues "github.com/dapr/components-contrib/bindings/azure/storagequeues"
|
||||
b_http "github.com/dapr/components-contrib/bindings/http"
|
||||
b_kafka "github.com/dapr/components-contrib/bindings/kafka"
|
||||
b_mqtt "github.com/dapr/components-contrib/bindings/mqtt"
|
||||
b_redis "github.com/dapr/components-contrib/bindings/redis"
|
||||
p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs"
|
||||
p_servicebus "github.com/dapr/components-contrib/pubsub/azure/servicebus"
|
||||
p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast"
|
||||
p_kafka "github.com/dapr/components-contrib/pubsub/kafka"
|
||||
|
@ -56,6 +58,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
eventhubs = "azure.eventhubs"
|
||||
redis = "redis"
|
||||
kafka = "kafka"
|
||||
mqtt = "mqtt"
|
||||
|
@ -310,6 +313,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
|
|||
switch tc.Component {
|
||||
case redis:
|
||||
pubsub = p_redis.NewRedisStreams(testLogger)
|
||||
case eventhubs:
|
||||
pubsub = p_eventhubs.NewAzureEventHubs(testLogger)
|
||||
case "azure.servicebus":
|
||||
pubsub = p_servicebus.NewAzureServiceBus(testLogger)
|
||||
case "natsstreaming":
|
||||
|
@ -379,6 +384,8 @@ func loadOutputBindings(tc TestComponent) bindings.OutputBinding {
|
|||
binding = b_azure_servicebusqueues.NewAzureServiceBusQueues(testLogger)
|
||||
case "azure.eventgrid":
|
||||
binding = b_azure_eventgrid.NewAzureEventGrid(testLogger)
|
||||
case eventhubs:
|
||||
binding = b_azure_eventhubs.NewAzureEventHubs(testLogger)
|
||||
case kafka:
|
||||
binding = b_kafka.NewKafka(testLogger)
|
||||
case "http":
|
||||
|
@ -402,6 +409,8 @@ func loadInputBindings(tc TestComponent) bindings.InputBinding {
|
|||
binding = b_azure_storagequeues.NewAzureStorageQueues(testLogger)
|
||||
case "azure.eventgrid":
|
||||
binding = b_azure_eventgrid.NewAzureEventGrid(testLogger)
|
||||
case eventhubs:
|
||||
binding = b_azure_eventhubs.NewAzureEventHubs(testLogger)
|
||||
case kafka:
|
||||
binding = b_kafka.NewKafka(testLogger)
|
||||
case mqtt:
|
||||
|
|
Loading…
Reference in New Issue