Initial Certification test for eventhubs binding [incomplete] (#1670)

* certification test for eventhubs binding

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

* modified go.mod and go.sum

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

* Add connection string testing

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

* iothub testing

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

* address feedback and run test

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

* Install Azure CLI IOT hub extension

Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>

* make modtidy-all

Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>

* covering all eventhubs test cases

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

* dependency changes after go modtidy-all

Signed-off-by: tanvigour <tanvi.gour@gmail.com>

Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Looong Dai <long.dai@intel.com>
This commit is contained in:
tanvigour 2022-05-05 22:33:23 -04:00 committed by GitHub
parent bfd87eb7a5
commit 47db6cc2b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 2143 additions and 0 deletions

View File

@ -93,6 +93,8 @@ jobs:
required-secrets: AzureServiceBusConnectionString
- component: bindings.azure.cosmosdb
required-secrets: AzureCosmosDBUrl,AzureCosmosDB,AzureCosmosDBCollection,AzureCosmosDBMasterKey,AzureCertificationTenantId,AzureCertificationServicePrincipalClientId,AzureCertificationServicePrincipalClientSecret
- component: bindings.azure.eventhubs
required-secrets: AzureEventHubsBindingsConnectionString,AzureBlobStorageAccount,AzureBlobStorageAccessKey,AzureEventHubsBindingsHub,AzureEventHubsBindingsNamespace,AzureCertificationServicePrincipalClientId,AzureCertificationTenantId,AzureCertificationServicePrincipalClientSecret,AzureResourceGroupName,AzureCertificationSubscriptionId,AzureEventHubsBindingsContainer,AzureIotHubEventHubConnectionString,AzureIotHubName,AzureIotHubBindingsConsumerGroup
- component: pubsub.azure.eventhubs
required-secrets: AzureEventHubsPubsubTopicActiveConnectionString,AzureEventHubsPubsubNamespace,AzureEventHubsPubsubNamespaceConnectionString,AzureBlobStorageAccount,AzureBlobStorageAccessKey,AzureEventHubsPubsubContainer,AzureIotHubName,AzureIotHubEventHubConnectionString,AzureCertificationTenantId,AzureCertificationServicePrincipalClientId,AzureCertificationServicePrincipalClientSecret,AzureResourceGroupName,AzureCertificationSubscriptionId
EOF

View File

@ -0,0 +1,33 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-single-partition-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: connectionString # Azure EventHubs connection string
secretKeyRef:
name: AzureEventHubsBindingsConnectionString
value: AzureEventHubsBindingsConnectionString
- name: consumerGroup # EventHubs consumer group
secretKeyRef:
name: AzureEventHubsBindingsConsumerGroup
value: AzureEventHubsBindingsConsumerGroup
- name: storageAccountName # Azure Storage Account Name
secretKeyRef:
name: AzureBlobStorageAccount
value: AzureBlobStorageAccount
- name: storageAccountKey # Azure Storage Account Key
secretKeyRef:
name: AzureBlobStorageAccessKey
value: AzureBlobStorageAccessKey
- name: storageContainerName # Azure Storage Container Name
secretKeyRef:
name: AzureEventHubsBindingsContainer
value: AzureEventHubsBindingsContainer
- name: partitionID
value: 0
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,31 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-input-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: connectionString # Azure EventHubs connection string
secretKeyRef:
name: AzureEventHubsBindingsConnectionString
value: AzureEventHubsBindingsConnectionString
- name: consumerGroup # EventHubs consumer group
secretKeyRef:
name: AzureEventHubsBindingsConsumerGroup
value: AzureEventHubsBindingsConsumerGroup
- name: storageAccountName # Azure Storage Account Name
secretKeyRef:
name: AzureBlobStorageAccount
value: AzureBlobStorageAccount
- name: storageAccountKey # Azure Storage Account Key
secretKeyRef:
name: AzureBlobStorageAccessKey
value: AzureBlobStorageAccessKey
- name: storageContainerName # Azure Storage Container Name
secretKeyRef:
name: AzureEventHubsBindingsContainer
value: AzureEventHubsBindingsContainer
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,31 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-output-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: connectionString # Azure EventHubs connection string
secretKeyRef:
name: AzureEventHubsBindingsConnectionString
value: AzureEventHubsBindingsConnectionString
- name: consumerGroup # EventHubs consumer group
secretKeyRef:
name: AzureEventHubsBindingsConsumerGroup
value: AzureEventHubsBindingsConsumerGroup
- name: storageAccountName # Azure Storage Account Name
secretKeyRef:
name: AzureBlobStorageAccount
value: AzureBlobStorageAccount
- name: storageAccountKey # Azure Storage Account Key
secretKeyRef:
name: AzureBlobStorageAccessKey
value: AzureBlobStorageAccessKey
- name: storageContainerName # Azure Storage Container Name
secretKeyRef:
name: AzureEventHubsBindingsContainer
value: AzureEventHubsBindingsContainer
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,33 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-partition0-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: connectionString # Azure EventHubs connection string
secretKeyRef:
name: AzureEventHubsBindingsConnectionString
value: AzureEventHubsBindingsConnectionString
- name: consumerGroup # EventHubs consumer group
secretKeyRef:
name: AzureEventHubsBindingsConsumerGroup
value: AzureEventHubsBindingsConsumerGroup
- name: storageAccountName # Azure Storage Account Name
secretKeyRef:
name: AzureBlobStorageAccount
value: AzureBlobStorageAccount
- name: storageAccountKey # Azure Storage Account Key
secretKeyRef:
name: AzureBlobStorageAccessKey
value: AzureBlobStorageAccessKey
- name: storageContainerName # Azure Storage Container Name
secretKeyRef:
name: AzureEventHubsBindingsContainer
value: AzureEventHubsBindingsContainer
- name: partitionID
value: 0
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,34 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-partition1-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: connectionString # Azure EventHubs connection string
secretKeyRef:
name: AzureEventHubsBindingsConnectionString
value: AzureEventHubsBindingsConnectionString
- name: consumerGroup # EventHubs consumer group
secretKeyRef:
name: AzureEventHubsBindingsConsumerGroup
value: AzureEventHubsBindingsConsumerGroup
- name: storageAccountName # Azure Storage Account Name
secretKeyRef:
name: AzureBlobStorageAccount
value: AzureBlobStorageAccount
- name: storageAccountKey # Azure Storage Account Key
secretKeyRef:
name: AzureBlobStorageAccessKey
value: AzureBlobStorageAccessKey
- name: storageContainerName # Azure Storage Container Name
secretKeyRef:
name: AzureEventHubsBindingsContainer
value: AzureEventHubsBindingsContainer
- name: partitionID
value: 1
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,35 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-eventhubs-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureIotHubEventHubConnectionString
key: AzureIotHubEventHubConnectionString
- name: storageAccountName
secretKeyRef:
name: AzureBlobStorageAccount
key: AzureBlobStorageAccount
- name: storageAccountKey
secretKeyRef:
name: AzureBlobStorageAccessKey
key: AzureBlobStorageAccessKey
- name: storageContainerName
secretKeyRef:
name: AzureEventHubsBindingsContainer
key: AzureEventHubsBindingsContainer
- name: iotHub
secretKeyRef:
name: AzureIotHubName
key: AzureIotHubName
- name: consumerGroup
secretKeyRef:
name: AzureIotHubBindingsConsumerGroup
key: AzureIotHubBindingsConsumerGroup
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,55 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azure-eventhubs-binding
namespace: default
spec:
type: bindings.azure.eventhubs
version: v1
metadata:
- name: eventHubNamespace
secretKeyRef:
name: AzureEventHubsBindingsNamespace
key: AzureEventHubsBindingsNamespace
- name: azureTenantId
secretKeyRef:
name: AzureCertificationTenantId
key: AzureCertificationTenantId
- name: azureClientId
secretKeyRef:
name: AzureCertificationServicePrincipalClientId
key: AzureCertificationServicePrincipalClientId
- name: azureClientSecret
secretKeyRef:
name: AzureCertificationServicePrincipalClientSecret
key: AzureCertificationServicePrincipalClientSecret
- name: resourceGroupName
secretKeyRef:
name: AzureResourceGroupName
key: AzureResourceGroupName
- name: subscriptionID
secretKeyRef:
name: AzureCertificationSubscriptionId
key: AzureCertificationSubscriptionId
- name: storageAccountName
secretKeyRef:
name: AzureBlobStorageAccount
key: AzureBlobStorageAccount
- name: storageAccountKey
secretKeyRef:
name: AzureBlobStorageAccessKey
key: AzureBlobStorageAccessKey
- name: storageContainerName
secretKeyRef:
name: AzureEventHubsBindingsContainer
key: AzureEventHubsBindingsContainer
- name: eventHub
secretKeyRef:
name: AzureEventHubsBindingsHub
key: AzureEventHubsBindingsHub
- name: consumerGroup
secretKeyRef:
name: AzureEventHubsBindingsConsumerGroup
key: AzureEventHubsBindingsConsumerGroup
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,6 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: eventhubsconfig
spec:
features:

View File

@ -0,0 +1,426 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs_test
import (
"context"
"fmt"
//"os/exec"
"testing"
"time"
//"github.com/stretchr/testify/assert"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"github.com/dapr/components-contrib/bindings"
bindings_loader "github.com/dapr/dapr/pkg/components/bindings"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
"github.com/dapr/kit/logger"
"github.com/dapr/components-contrib/secretstores"
secretstore_env "github.com/dapr/components-contrib/secretstores/local/env"
secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores"
"github.com/dapr/components-contrib/bindings/azure/eventhubs"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
)
const (
numMessages = 100
messageKey = "partitionKey"
)
func TestSinglePartition(t *testing.T) {
logger := logger.NewLogger("dapr.components")
out_component := bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
return eventhubs.NewAzureEventHubs(logger)
})
in_component := bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
return eventhubs.NewAzureEventHubs(logger)
})
secrets_components := secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(logger)
})
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
consumerGroup1 := watcher.NewUnordered()
metadata := map[string]string{
messageKey: "test",
}
sendAndReceive := func(metadata map[string]string, messages ...*watcher.Watcher) flow.Runnable {
_, hasKey := metadata[messageKey]
return func(ctx flow.Context) error {
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "dapr init failed")
// Define what is expected
outputmsg := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
outputmsg[i] = fmt.Sprintf("output binding: Message %03d", i)
}
consumerGroup1.ExpectStrings(outputmsg...)
time.Sleep(20 * time.Second)
if !hasKey {
metadata[messageKey] = uuid.NewString()
}
// Send events from output binding
for _, msg := range outputmsg {
ctx.Logf("Sending eventhub message: %q", msg)
err := client.InvokeOutputBinding(
ctx, &dapr.InvokeBindingRequest{
Name: "azure-single-partition-binding",
Operation: "create",
Data: []byte(msg),
Metadata: metadata,
})
require.NoError(ctx, err, "error publishing message")
}
// Assert the observed messages
consumerGroup1.Assert(ctx, time.Minute)
return nil
}
}
// Application logic that tracks messages from eventhub.
application := func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("azure-single-partition-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
consumerGroup1.Observe(string(in.Data))
if err := sim(); err != nil {
return nil, err
}
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
/* TODO: Verify IOT Hub :
consumerGroup2 := watcher.NewUnordered()
publishMessages := func(ctx flow.Context) error {
// Define what is expected
outputmsg := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
outputmsg[i] = fmt.Sprintf("publish messages to device: Message %03d", i)
}
consumerGroup2.ExpectStrings(outputmsg...)
cmd := exec.Command("/bin/bash", "send-iot-device-events.sh")
out, err := cmd.CombinedOutput()
assert.Nil(t, err, "Error in send-iot-device-events.sh:\n%s", out)
consumerGroup2.Assert(ctx, time.Minute)
return nil
flow.New(t, "eventhubs binding IoTHub testing").
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("sidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/binding/iothub"),
runtime.WithSecretStores(secrets_components),
runtime.WithOutputBindings(out_component),
runtime.WithInputBindings(in_component),
)).
Step("Send messages to IoT", publishMessages).
Run()
//TODO: Verfiy service principal
// Flow of events: Start app, sidecar, interrupt network to check reconnection, send and receive
flow.New(t, "eventhubs binding authentication using service principal").
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("sidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/binding/serviceprincipal"),
runtime.WithSecretStores(secrets_components),
runtime.WithOutputBindings(out_component),
runtime.WithInputBindings(in_component),
)).
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
Step("send and wait", sendAndReceive).
Run()*/
// Flow of events: Start app, sidecar, interrupt network to check reconnection, send and receive
flow.New(t, "eventhubs binding authentication using connection string single partition").
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("sidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/binding/consumer1"),
runtime.WithSecretStores(secrets_components),
runtime.WithOutputBindings(out_component),
runtime.WithInputBindings(in_component),
)).
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
Step("send and wait", sendAndReceive(metadata)).
Run()
}
func TestEventhubBindingMultipleSenders(t *testing.T) {
logger := logger.NewLogger("dapr.components")
out_component := bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
return eventhubs.NewAzureEventHubs(logger)
})
in_component := bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
return eventhubs.NewAzureEventHubs(logger)
})
secrets_components := secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(logger)
})
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
sendAndReceive := func(ctx flow.Context) error {
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "dapr init failed")
// Define what is expected
outputmsg := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
outputmsg[i] = fmt.Sprintf("input binding: Message %03d", i)
}
consumerGroup1.ExpectStrings(outputmsg...)
time.Sleep(40 * time.Second)
// Send events from input binding
for _, msg := range outputmsg {
ctx.Logf("Sending eventhub message: %q", msg)
err := client.InvokeOutputBinding(
ctx, &dapr.InvokeBindingRequest{
Name: "azure-input-binding",
Operation: "create",
Data: []byte(msg),
})
require.NoError(ctx, err, "error publishing message")
}
// Assert the observed messages
consumerGroup1.Assert(ctx, time.Minute)
outputmsg2 := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
outputmsg2[i] = fmt.Sprintf("output binding: Message %03d", i)
}
consumerGroup2.ExpectStrings(outputmsg2...)
time.Sleep(40 * time.Second)
// Send events from output binding
for _, msg2 := range outputmsg2 {
ctx.Logf("Sending eventhub message: %q", msg2)
err := client.InvokeOutputBinding(
ctx, &dapr.InvokeBindingRequest{
Name: "azure-output-binding",
Operation: "create",
Data: []byte(msg2),
})
require.NoError(ctx, err, "error publishing message")
}
// Assert the observed messages
consumerGroup2.Assert(ctx, time.Minute)
return nil
}
// Application logic that tracks messages from eventhub.
application := func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("azure-output-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
consumerGroup2.Observe(string(in.Data))
if err := sim(); err != nil {
return nil, err
}
ctx.Logf("Output binding - Receiving eventhubs message: %s", string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("azure-input-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
consumerGroup1.Observe(string(in.Data))
if err := sim(); err != nil {
return nil, err
}
ctx.Logf("Input binding: Receiving eventhubs message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "eventhubs binding authentication using multiple senders and receivers").
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("sidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/binding/consumer2"),
runtime.WithSecretStores(secrets_components),
runtime.WithOutputBindings(out_component),
runtime.WithInputBindings(in_component),
)).
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
Step("send and wait", sendAndReceive).
Run()
}
func TestEventhubBindingMultiplePartition(t *testing.T) {
logger := logger.NewLogger("dapr.components")
out_component := bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
return eventhubs.NewAzureEventHubs(logger)
})
in_component := bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
return eventhubs.NewAzureEventHubs(logger)
})
secrets_components := secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(logger)
})
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
sendAndReceive := func(ctx flow.Context) error {
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "dapr init failed")
// Define what is expected
outputmsg := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
outputmsg[i] = fmt.Sprintf("output binding: Message %03d", i)
}
consumerGroup1.ExpectStrings(outputmsg...)
time.Sleep(40 * time.Second)
// Send events from output binding
for _, msg := range outputmsg {
ctx.Logf("Sending eventhub message: %q", msg)
err := client.InvokeOutputBinding(
ctx, &dapr.InvokeBindingRequest{
Name: "azure-partition0-binding",
Operation: "create",
Data: []byte(msg),
})
require.NoError(ctx, err, "error publishing message")
}
// Assert the observed messages
consumerGroup1.Assert(ctx, time.Minute)
// Define what is expected
outputmsg2 := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
outputmsg2[i] = fmt.Sprintf("output binding: Message %03d", i)
}
consumerGroup2.ExpectStrings(outputmsg2...)
time.Sleep(40 * time.Second)
// Send events from output binding
for _, msg2 := range outputmsg2 {
ctx.Logf("Sending eventhub message: %q", msg2)
err := client.InvokeOutputBinding(
ctx, &dapr.InvokeBindingRequest{
Name: "azure-partition1-binding",
Operation: "create",
Data: []byte(msg2),
})
require.NoError(ctx, err, "error publishing message")
}
// Assert the observed messages
consumerGroup2.Assert(ctx, time.Minute)
return nil
}
// Application logic that tracks messages from eventhub.
application := func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("azure-partition0-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
consumerGroup1.Observe(string(in.Data))
if err := sim(); err != nil {
return nil, err
}
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("azure-partition1-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
consumerGroup2.Observe(string(in.Data))
if err := sim(); err != nil {
return nil, err
}
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "eventhubs binding authentication using connection string all partitions").
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("sidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/binding/consumer3"),
runtime.WithSecretStores(secrets_components),
runtime.WithOutputBindings(out_component),
runtime.WithInputBindings(in_component),
)).
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
Step("send and wait", sendAndReceive).
Run()
}

View File

@ -0,0 +1,145 @@
module servicebusqueue_test
go 1.17
require (
github.com/dapr/components-contrib v1.7.1-0.20220426033643-068938c67654
github.com/dapr/components-contrib/tests/certification v0.0.0-20211026011813-36b75e9ae272
github.com/dapr/dapr v1.7.1-0.20220426092903-063b1611d1cb
github.com/dapr/go-sdk v1.4.0
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.7.0
go.uber.org/multierr v1.7.0
)
require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.2.0 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.3.10 // indirect
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
github.com/Azure/azure-sdk-for-go v59.3.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
github.com/Azure/go-amqp v0.13.13 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.23 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.16 // indirect
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.0.2 // indirect
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/devigned/tab v0.1.1 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/fasthttp/router v1.3.8 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/cel-go v0.9.0 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/hashicorp/consul/api v1.11.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.14.1 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.9.5 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d // indirect
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.35 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/openzipkin/zipkin-go v0.2.2 // indirect
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.22.3 // indirect
github.com/savsgio/gotils v0.0.0-20210217112953-d4a072536008 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/tylertreat/comcast v1.0.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.31.1-0.20211216042702-258a4c17b4f4 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v0.20.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.23.0 // indirect
k8s.io/apiextensions-apiserver v0.23.0 // indirect
k8s.io/apimachinery v0.23.0 // indirect
k8s.io/client-go v0.23.0 // indirect
k8s.io/component-base v0.23.0 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect
sigs.k8s.io/controller-runtime v0.11.0 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/dapr/components-contrib => ../../../../..
replace github.com/dapr/components-contrib/tests/certification => ../../..

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
#!/usr/bin/env bash
# ------------------------------------------------------------
# Copyright 2021 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------
set -e
if [[ -z "${IOT_HUB_NAME}" ]]; then
echo "ERROR: IOT_HUB_NAME environment variable not defined."
exit 1
fi
if [[ -z "${AZURE_CREDENTIALS}" ]]; then
echo "ERROR: AZURE_CREDENTIALS environment variable not defined."
exit 1
fi
# Install azure-iot extension without prompt
# https://docs.microsoft.com/en-us/cli/azure/azure-cli-extensions-overview
# https://github.com/Azure/azure-iot-cli-extension
az config set extension.use_dynamic_install=yes_without_prompt
# Log in to Azure using provided Service Principal (SP) credentials
# The provided SP must have Contributor role access to the IoT Hub specified by IOT_HUB_NAME
SDK_AUTH_SP_APPID="$(echo "${AZURE_CREDENTIALS}" | grep 'clientId' | sed -E 's/(.*clientId\"\: \")|\",//g')"
SDK_AUTH_SP_CLIENT_SECRET="$(echo "${AZURE_CREDENTIALS}" | grep 'clientSecret' | sed -E 's/(.*clientSecret\"\: \")|\",//g')"
SDK_AUTH_SP_TENANT="$(echo "${AZURE_CREDENTIALS}" | grep 'tenantId' | sed -E 's/(.*tenantId\"\: \")|\",//g')"
az login --service-principal -u ${SDK_AUTH_SP_APPID} -p ${SDK_AUTH_SP_CLIENT_SECRET} --tenant ${SDK_AUTH_SP_TENANT}
# Create test device ID if not already present
IOT_HUB_TEST_DEVICE_NAME="test-device"
if [[ -z "$(az iot hub device-identity show -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME})" ]]; then
az iot hub device-identity create -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME}
sleep 5
fi
# Send the test IoT device messages to the IoT Hub
az iot device simulate -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME} --data '{ "data": "Integration test message" }' --msg-count 2 --msg-interval 1 --protocol http --properties "iothub-userid=dapr-user-id;iothub-messageid=dapr-message-id"