Complete Eventhubs bindings certification test (#1716)
* Complete Eventhubs bindings certification test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * check if single test point runs all good Signed-off-by: tanvigour <tanvi.gour@gmail.com> * check if single test point runs all good Signed-off-by: tanvigour <tanvi.gour@gmail.com> * increase the sleep time to fix timeout Signed-off-by: tanvigour <tanvi.gour@gmail.com> * increase the sleep time to fix timeout Signed-off-by: tanvigour <tanvi.gour@gmail.com> * increase sleep for all other testpoints too Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test behavior of single partition test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * delete eventhub and container afer run Signed-off-by: tanvigour <tanvi.gour@gmail.com> * delete eventhub and container afer run Signed-off-by: tanvigour <tanvi.gour@gmail.com> * delete eventhub and container afer run Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test reconnection in one test point only Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test reconnection in one test point only Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test reconnection in one test point only Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test reconnection in one test point only Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test reconnection in one test point only Signed-off-by: tanvigour <tanvi.gour@gmail.com> * delete the container after every test run for cleanup Signed-off-by: tanvigour <tanvi.gour@gmail.com> * run service principal test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * Go mod tidy-all Signed-off-by: tanvigour <tanvi.gour@gmail.com> * revert back changes to last clean state Signed-off-by: tanvigour <tanvi.gour@gmail.com> * add unique consumer id per component yaml Signed-off-by: tanvigour <tanvi.gour@gmail.com> * remove consumer id from singlepartition test component yaml Signed-off-by: tanvigour <tanvi.gour@gmail.com> * remove repetitive test case, test service principal case Signed-off-by: tanvigour <tanvi.gour@gmail.com> * remove repetitive test case, test service principal case Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test iothub Signed-off-by: tanvigour <tanvi.gour@gmail.com> * test iothub Signed-off-by: tanvigour <tanvi.gour@gmail.com> * run service principal test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * run service principal test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * run service principal test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * run service principal test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * add connection string to eventhub service principal yaml Signed-off-by: tanvigour <tanvi.gour@gmail.com> * use a particular partition Signed-off-by: tanvigour <tanvi.gour@gmail.com> * enable iothub test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * enable iothub test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * enable iothub test Signed-off-by: tanvigour <tanvi.gour@gmail.com> * final cleanup and test run Signed-off-by: tanvigour <tanvi.gour@gmail.com> * final cleanup and test run Signed-off-by: tanvigour <tanvi.gour@gmail.com> * final cleanup and test run Signed-off-by: tanvigour <tanvi.gour@gmail.com> * add test plan readme Signed-off-by: tanvigour <tanvi.gour@gmail.com> * rollback the bicep change that is not needed Signed-off-by: tanvigour <tanvi.gour@gmail.com> * make sure through test that a specific partition receives specific data Signed-off-by: tanvigour <tanvi.gour@gmail.com> * clean the container between two partiton assertions Signed-off-by: tanvigour <tanvi.gour@gmail.com> * put back partition id in yaml Signed-off-by: tanvigour <tanvi.gour@gmail.com> * add same level of partition testing for both partitions Signed-off-by: tanvigour <tanvi.gour@gmail.com> * address feedback Signed-off-by: tanvigour <tanvi.gour@gmail.com> Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
parent
9d7f4867b4
commit
55a3d7f7d4
|
|
@ -0,0 +1,39 @@
|
|||
# Azure Event Hubs Bindings certification testing
|
||||
|
||||
This project aims to test the Azure Event Hubs bindings component under various conditions.
|
||||
|
||||
## Test Plan
|
||||
|
||||
- Test sending /receiving data between single partition
|
||||
- Start an app with 1 sender and 1 receiver
|
||||
- Provide multiple partitions but store data in one partition
|
||||
- Receiver should only receive message from one partition
|
||||
- Sends 100+ unique messages
|
||||
- Simulates periodic errors
|
||||
- Confirm that all expected messages were received
|
||||
- Confirm that receiver does not receive messages from other than one partition
|
||||
|
||||
- Test sending /receiving data multiple partitions/sender and receivers
|
||||
- Start an app with 1 sender and 1 receiver
|
||||
- Send data from 2 partitions
|
||||
- Sends 100+ unique messages
|
||||
- Simulates periodic errors
|
||||
- Confirm that all expected messages were received
|
||||
- Confirm messages were received from all partitions
|
||||
|
||||
- Test reconnection
|
||||
- Start an app with 1 senders and 1 receivers
|
||||
- Send 100+ unique messages from 1 sender
|
||||
- Interrupt the connection
|
||||
- Confirm that all expected messages were received by respective receiver
|
||||
|
||||
- IoT hub testing
|
||||
- Move existing IoT test from integration to conformance test
|
||||
|
||||
- Auth testing
|
||||
- Test connection string based authentication mechanism
|
||||
- Test service principal based authentication mechanism
|
||||
|
||||
### Running the tests
|
||||
|
||||
This must be run in the GitHub Actions Workflow configured for test infrastructure setup.
|
||||
|
|
@ -7,6 +7,8 @@ spec:
|
|||
type: bindings.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: ehcertification1
|
||||
- name: connectionString # Azure EventHubs connection string
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConnectionString
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ spec:
|
|||
type: bindings.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: ehcertification2
|
||||
- name: connectionString # Azure EventHubs connection string
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConnectionString
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ spec:
|
|||
type: bindings.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: ehcertification1
|
||||
- name: connectionString # Azure EventHubs connection string
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConnectionString
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ spec:
|
|||
type: bindings.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: ehcertification2
|
||||
- name: connectionString # Azure EventHubs connection string
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConnectionString
|
||||
|
|
|
|||
|
|
@ -7,10 +7,16 @@ spec:
|
|||
type: bindings.azure.eventhubs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: ehcertification1
|
||||
- name: connectionString
|
||||
secretKeyRef:
|
||||
name: AzureIotHubEventHubConnectionString
|
||||
key: AzureIotHubEventHubConnectionString
|
||||
- name: consumerGroup
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConsumerGroup
|
||||
value: AzureEventHubsBindingsConsumerGroup
|
||||
- name: storageAccountName
|
||||
secretKeyRef:
|
||||
name: AzureBlobStorageAccount
|
||||
|
|
@ -23,13 +29,7 @@ spec:
|
|||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsContainer
|
||||
key: AzureEventHubsBindingsContainer
|
||||
- name: iotHub
|
||||
secretKeyRef:
|
||||
name: AzureIotHubName
|
||||
key: AzureIotHubName
|
||||
- name: consumerGroup
|
||||
secretKeyRef:
|
||||
name: AzureIotHubBindingsConsumerGroup
|
||||
key: AzureIotHubBindingsConsumerGroup
|
||||
- name: PartitionID
|
||||
value: 0
|
||||
auth:
|
||||
secretStore: envvar-secret-store
|
||||
|
|
|
|||
|
|
@ -10,7 +10,11 @@ spec:
|
|||
- name: eventHubNamespace
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsNamespace
|
||||
key: AzureEventHubsBindingsNamespace
|
||||
key: AzureEventHubsBindingsNamespace
|
||||
- name: connectionString
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConnectionString
|
||||
value: AzureEventHubsBindingsConnectionString
|
||||
- name: azureTenantId
|
||||
secretKeyRef:
|
||||
name: AzureCertificationTenantId
|
||||
|
|
@ -31,6 +35,10 @@ spec:
|
|||
secretKeyRef:
|
||||
name: AzureCertificationSubscriptionId
|
||||
key: AzureCertificationSubscriptionId
|
||||
- name: consumerGroup
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConsumerGroup
|
||||
value: AzureEventHubsBindingsConsumerGroup
|
||||
- name: storageAccountName
|
||||
secretKeyRef:
|
||||
name: AzureBlobStorageAccount
|
||||
|
|
@ -42,14 +50,8 @@ spec:
|
|||
- name: storageContainerName
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsContainer
|
||||
key: AzureEventHubsBindingsContainer
|
||||
- name: eventHub
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsHub
|
||||
key: AzureEventHubsBindingsHub
|
||||
- name: consumerGroup
|
||||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsConsumerGroup
|
||||
key: AzureEventHubsBindingsConsumerGroup
|
||||
value: AzureEventHubsBindingsContainer
|
||||
- name: partitionID
|
||||
value: 0
|
||||
auth:
|
||||
secretStore: envvar-secret-store
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
# ------------------------------------------------------------
|
||||
# Copyright 2021 The Dapr Authors
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# login to azure
|
||||
az login --service-principal -u $AzureCertificationServicePrincipalClientId -p $AzureCertificationServicePrincipalClientSecret --tenant $AzureCertificationTenantId
|
||||
|
||||
# delete container used by the consumer
|
||||
az storage container delete --account-key $AzureBlobStorageAccessKey --account-name $AzureBlobStorageAccount --name $AzureEventHubsBindingsContainer
|
||||
|
|
@ -16,13 +16,13 @@ package eventhubs_test
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
//"os/exec"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
//"github.com/stretchr/testify/assert"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/multierr"
|
||||
|
||||
|
|
@ -50,8 +50,11 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
numMessages = 100
|
||||
messageKey = "partitionKey"
|
||||
numMessages = 100
|
||||
messageKey = "partitionKey"
|
||||
iotHubNameEnvKey = "AzureIotHubName"
|
||||
partition0 = "0"
|
||||
partition1 = "1"
|
||||
)
|
||||
|
||||
func TestSinglePartition(t *testing.T) {
|
||||
|
|
@ -129,58 +132,13 @@ func TestSinglePartition(t *testing.T) {
|
|||
}))
|
||||
return err
|
||||
}
|
||||
|
||||
/* TODO: Verify IOT Hub :
|
||||
consumerGroup2 := watcher.NewUnordered()
|
||||
publishMessages := func(ctx flow.Context) error {
|
||||
// Define what is expected
|
||||
outputmsg := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg[i] = fmt.Sprintf("publish messages to device: Message %03d", i)
|
||||
}
|
||||
consumerGroup2.ExpectStrings(outputmsg...)
|
||||
cmd := exec.Command("/bin/bash", "send-iot-device-events.sh")
|
||||
out, err := cmd.CombinedOutput()
|
||||
assert.Nil(t, err, "Error in send-iot-device-events.sh:\n%s", out)
|
||||
consumerGroup2.Assert(ctx, time.Minute)
|
||||
return nil
|
||||
|
||||
flow.New(t, "eventhubs binding IoTHub testing").
|
||||
Step("sleep", flow.Sleep(10*time.Second)).
|
||||
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
|
||||
Step(sidecar.Run("sidecar",
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(grpcPort),
|
||||
embedded.WithDaprHTTPPort(httpPort),
|
||||
embedded.WithComponentsPath("./components/binding/iothub"),
|
||||
runtime.WithSecretStores(secrets_components),
|
||||
runtime.WithOutputBindings(out_component),
|
||||
runtime.WithInputBindings(in_component),
|
||||
)).
|
||||
Step("Send messages to IoT", publishMessages).
|
||||
Run()
|
||||
|
||||
//TODO: Verfiy service principal
|
||||
// Flow of events: Start app, sidecar, interrupt network to check reconnection, send and receive
|
||||
flow.New(t, "eventhubs binding authentication using service principal").
|
||||
Step("sleep", flow.Sleep(10*time.Second)).
|
||||
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
|
||||
Step(sidecar.Run("sidecar",
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(grpcPort),
|
||||
embedded.WithDaprHTTPPort(httpPort),
|
||||
embedded.WithComponentsPath("./components/binding/serviceprincipal"),
|
||||
runtime.WithSecretStores(secrets_components),
|
||||
runtime.WithOutputBindings(out_component),
|
||||
runtime.WithInputBindings(in_component),
|
||||
)).
|
||||
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
|
||||
Step("send and wait", sendAndReceive).
|
||||
Run()*/
|
||||
|
||||
deleteEventhub := func(ctx flow.Context) error {
|
||||
output, err := exec.Command("/bin/sh", "deleteeventhub.sh").Output()
|
||||
assert.Nil(t, err, "Error in deleteeventhub.sh.:\n%s", string(output))
|
||||
return nil
|
||||
}
|
||||
// Flow of events: Start app, sidecar, interrupt network to check reconnection, send and receive
|
||||
flow.New(t, "eventhubs binding authentication using connection string single partition").
|
||||
// Step("sleep", flow.Sleep(10*time.Second)).
|
||||
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
|
||||
Step(sidecar.Run("sidecar",
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
|
|
@ -193,11 +151,11 @@ func TestSinglePartition(t *testing.T) {
|
|||
)).
|
||||
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
|
||||
Step("send and wait", sendAndReceive(metadata)).
|
||||
Step("delete containers", deleteEventhub).
|
||||
Run()
|
||||
}
|
||||
|
||||
func TestEventhubBindingMultipleSenders(t *testing.T) {
|
||||
|
||||
func TestEventhubBindingSerivcePrincipalAuth(t *testing.T) {
|
||||
logger := logger.NewLogger("dapr.components")
|
||||
out_component := bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
|
||||
return eventhubs.NewAzureEventHubs(logger)
|
||||
|
|
@ -215,60 +173,45 @@ func TestEventhubBindingMultipleSenders(t *testing.T) {
|
|||
appPort := ports[2]
|
||||
|
||||
consumerGroup1 := watcher.NewUnordered()
|
||||
consumerGroup2 := watcher.NewUnordered()
|
||||
|
||||
sendAndReceive := func(ctx flow.Context) error {
|
||||
metadata := map[string]string{
|
||||
messageKey: "test",
|
||||
}
|
||||
|
||||
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
|
||||
require.NoError(t, err, "dapr init failed")
|
||||
sendAndReceive := func(metadata map[string]string, messages ...*watcher.Watcher) flow.Runnable {
|
||||
_, hasKey := metadata[messageKey]
|
||||
return func(ctx flow.Context) error {
|
||||
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
|
||||
require.NoError(t, err, "dapr init failed")
|
||||
|
||||
// Define what is expected
|
||||
outputmsg := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg[i] = fmt.Sprintf("input binding: Message %03d", i)
|
||||
// Define what is expected
|
||||
outputmsg := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg[i] = fmt.Sprintf("output binding: Message %03d", i)
|
||||
}
|
||||
consumerGroup1.ExpectStrings(outputmsg...)
|
||||
time.Sleep(20 * time.Second)
|
||||
if !hasKey {
|
||||
metadata[messageKey] = uuid.NewString()
|
||||
}
|
||||
// Send events from output binding
|
||||
for _, msg := range outputmsg {
|
||||
ctx.Logf("Sending eventhub message: %q", msg)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-eventhubs-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg),
|
||||
Metadata: metadata,
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Assert the observed messages
|
||||
consumerGroup1.Assert(ctx, time.Minute)
|
||||
return nil
|
||||
}
|
||||
consumerGroup1.ExpectStrings(outputmsg...)
|
||||
time.Sleep(40 * time.Second)
|
||||
|
||||
// Send events from input binding
|
||||
for _, msg := range outputmsg {
|
||||
ctx.Logf("Sending eventhub message: %q", msg)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-input-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg),
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Assert the observed messages
|
||||
consumerGroup1.Assert(ctx, time.Minute)
|
||||
|
||||
outputmsg2 := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg2[i] = fmt.Sprintf("output binding: Message %03d", i)
|
||||
}
|
||||
consumerGroup2.ExpectStrings(outputmsg2...)
|
||||
time.Sleep(40 * time.Second)
|
||||
|
||||
// Send events from output binding
|
||||
for _, msg2 := range outputmsg2 {
|
||||
ctx.Logf("Sending eventhub message: %q", msg2)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-output-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg2),
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Assert the observed messages
|
||||
consumerGroup2.Assert(ctx, time.Minute)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Application logic that tracks messages from eventhub.
|
||||
|
|
@ -277,40 +220,110 @@ func TestEventhubBindingMultipleSenders(t *testing.T) {
|
|||
sim := simulate.PeriodicError(ctx, 100)
|
||||
// Setup the binding endpoints
|
||||
err = multierr.Combine(err,
|
||||
s.AddBindingInvocationHandler("azure-output-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
|
||||
consumerGroup2.Observe(string(in.Data))
|
||||
if err := sim(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx.Logf("Output binding - Receiving eventhubs message: %s", string(in.Data))
|
||||
return []byte("{}"), nil
|
||||
}),
|
||||
|
||||
s.AddBindingInvocationHandler("azure-input-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
|
||||
s.AddBindingInvocationHandler("azure-eventhubs-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
|
||||
consumerGroup1.Observe(string(in.Data))
|
||||
if err := sim(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx.Logf("Input binding: Receiving eventhubs message: %s", string(in.Data))
|
||||
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
|
||||
return []byte("{}"), nil
|
||||
}))
|
||||
|
||||
return err
|
||||
}
|
||||
flow.New(t, "eventhubs binding authentication using multiple senders and receivers").
|
||||
Step("sleep", flow.Sleep(10*time.Second)).
|
||||
|
||||
deleteEventhub := func(ctx flow.Context) error {
|
||||
output, err := exec.Command("/bin/sh", "deleteeventhub.sh").Output()
|
||||
assert.Nil(t, err, "Error in deleteeventhub.sh.:\n%s", string(output))
|
||||
return nil
|
||||
}
|
||||
// Flow of events: Start app, sidecar, interrupt network to check reconnection, send and receive
|
||||
flow.New(t, "eventhubs binding authentication using service principal").
|
||||
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
|
||||
Step(sidecar.Run("sidecar",
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(grpcPort),
|
||||
embedded.WithDaprHTTPPort(httpPort),
|
||||
embedded.WithComponentsPath("./components/binding/consumer2"),
|
||||
embedded.WithComponentsPath("./components/binding/serviceprincipal"),
|
||||
runtime.WithSecretStores(secrets_components),
|
||||
runtime.WithOutputBindings(out_component),
|
||||
runtime.WithInputBindings(in_component),
|
||||
)).
|
||||
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
|
||||
Step("send and wait", sendAndReceive).
|
||||
Step("send and wait", sendAndReceive(metadata)).
|
||||
Step("delete containers", deleteEventhub).
|
||||
Run()
|
||||
}
|
||||
|
||||
func TestEventhubBindingIOTHub(t *testing.T) {
|
||||
logger := logger.NewLogger("dapr.components")
|
||||
out_component := bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
|
||||
return eventhubs.NewAzureEventHubs(logger)
|
||||
})
|
||||
in_component := bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
|
||||
return eventhubs.NewAzureEventHubs(logger)
|
||||
})
|
||||
secrets_components := secretstores_loader.New("local.env", func() secretstores.SecretStore {
|
||||
return secretstore_env.NewEnvSecretStore(logger)
|
||||
})
|
||||
|
||||
ports, _ := dapr_testing.GetFreePorts(3)
|
||||
grpcPort := ports[0]
|
||||
httpPort := ports[1]
|
||||
appPort := ports[2]
|
||||
|
||||
consumerGroup1 := watcher.NewUnordered()
|
||||
|
||||
// Application logic that tracks messages from eventhub.
|
||||
application := func(ctx flow.Context, s common.Service) (err error) {
|
||||
// Simulate periodic errors.
|
||||
sim := simulate.PeriodicError(ctx, 100)
|
||||
// Setup the binding endpoints
|
||||
err = multierr.Combine(err,
|
||||
s.AddBindingInvocationHandler("azure-eventhubs-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
|
||||
consumerGroup1.Observe(string(in.Data))
|
||||
if err := sim(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
|
||||
return []byte("{}"), nil
|
||||
}))
|
||||
return err
|
||||
}
|
||||
|
||||
iotHubName := os.Getenv(iotHubNameEnvKey)
|
||||
consumerGroup3 := watcher.NewUnordered()
|
||||
sendIOTDevice := func(messages *watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
// Define what is expected
|
||||
outputmsg := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg[i] = fmt.Sprintf("messages to test iothub: Message %03d", i)
|
||||
}
|
||||
messages.ExpectStrings(outputmsg...)
|
||||
|
||||
cmd := exec.Command("/bin/bash", "send-iot-device-events.sh")
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("IOT_HUB_NAME=%s", iotHubName))
|
||||
cmd.CombinedOutput()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
deleteEventhub := func(ctx flow.Context) error {
|
||||
output, err := exec.Command("/bin/sh", "deleteeventhub.sh").Output()
|
||||
assert.Nil(t, err, "Error in deleteeventhub.sh.:\n%s", string(output))
|
||||
return nil
|
||||
}
|
||||
flow.New(t, "eventhubs binding IoTHub testing").
|
||||
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
|
||||
Step(sidecar.Run("sidecar",
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(grpcPort),
|
||||
embedded.WithDaprHTTPPort(httpPort),
|
||||
embedded.WithComponentsPath("./components/binding/iothub"),
|
||||
runtime.WithSecretStores(secrets_components),
|
||||
runtime.WithOutputBindings(out_component),
|
||||
runtime.WithInputBindings(in_component),
|
||||
)).
|
||||
Step("Send messages to IoT", sendIOTDevice(consumerGroup3)).
|
||||
Step("delete containers", deleteEventhub).
|
||||
Run()
|
||||
}
|
||||
|
||||
|
|
@ -334,61 +347,68 @@ func TestEventhubBindingMultiplePartition(t *testing.T) {
|
|||
consumerGroup1 := watcher.NewUnordered()
|
||||
consumerGroup2 := watcher.NewUnordered()
|
||||
|
||||
sendAndReceive := func(ctx flow.Context) error {
|
||||
|
||||
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
|
||||
require.NoError(t, err, "dapr init failed")
|
||||
|
||||
// Define what is expected
|
||||
outputmsg := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg[i] = fmt.Sprintf("output binding: Message %03d", i)
|
||||
}
|
||||
consumerGroup1.ExpectStrings(outputmsg...)
|
||||
time.Sleep(40 * time.Second)
|
||||
|
||||
// Send events from output binding
|
||||
for _, msg := range outputmsg {
|
||||
ctx.Logf("Sending eventhub message: %q", msg)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-partition0-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg),
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Assert the observed messages
|
||||
consumerGroup1.Assert(ctx, time.Minute)
|
||||
|
||||
// Define what is expected
|
||||
outputmsg2 := make([]string, numMessages)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
outputmsg2[i] = fmt.Sprintf("output binding: Message %03d", i)
|
||||
}
|
||||
consumerGroup2.ExpectStrings(outputmsg2...)
|
||||
time.Sleep(40 * time.Second)
|
||||
|
||||
// Send events from output binding
|
||||
for _, msg2 := range outputmsg2 {
|
||||
ctx.Logf("Sending eventhub message: %q", msg2)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-partition1-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg2),
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Assert the observed messages
|
||||
consumerGroup2.Assert(ctx, time.Minute)
|
||||
return nil
|
||||
metadata0 := map[string]string{
|
||||
messageKey: partition0,
|
||||
}
|
||||
|
||||
metadata1 := map[string]string{
|
||||
messageKey: partition1,
|
||||
}
|
||||
sendAndReceive := func(metadata0 map[string]string, metadata1 map[string]string) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
client, err := dapr.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
|
||||
require.NoError(t, err, "dapr init failed")
|
||||
|
||||
// Define what is expected
|
||||
outputmsg := make([]string, 50)
|
||||
for i := 0; i < 50; i++ {
|
||||
outputmsg[i] = fmt.Sprintf("output binding: Message %d, partitionkey: %s", i, metadata0[messageKey])
|
||||
}
|
||||
consumerGroup1.ExpectStrings(outputmsg...)
|
||||
time.Sleep(40 * time.Second)
|
||||
|
||||
// Send events from output binding
|
||||
for _, msg := range outputmsg {
|
||||
ctx.Logf("Sending eventhub message: %q", msg)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-partition0-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg),
|
||||
Metadata: metadata0,
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Define what is expected
|
||||
outputmsg2 := make([]string, 50)
|
||||
for i := 0; i < 50; i++ {
|
||||
outputmsg2[i] = fmt.Sprintf("output binding: Message %d, partitionkey: %s", i+50, metadata1[messageKey])
|
||||
}
|
||||
consumerGroup2.ExpectStrings(outputmsg2...)
|
||||
time.Sleep(120 * time.Second)
|
||||
|
||||
// Send events from output binding
|
||||
for _, msg2 := range outputmsg2 {
|
||||
ctx.Logf("Sending eventhub message: %q", msg2)
|
||||
|
||||
err := client.InvokeOutputBinding(
|
||||
ctx, &dapr.InvokeBindingRequest{
|
||||
Name: "azure-partition1-binding",
|
||||
Operation: "create",
|
||||
Data: []byte(msg2),
|
||||
Metadata: metadata1,
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Assert the observed messages
|
||||
consumerGroup1.Assert(ctx, time.Minute)
|
||||
consumerGroup2.Assert(ctx, time.Minute)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// Application logic that tracks messages from eventhub.
|
||||
application := func(ctx flow.Context, s common.Service) (err error) {
|
||||
// Simulate periodic errors.
|
||||
|
|
@ -400,6 +420,7 @@ func TestEventhubBindingMultiplePartition(t *testing.T) {
|
|||
if err := sim(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
consumerGroup1.FailIfNotExpected(t, string(in.Data))
|
||||
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
|
||||
return []byte("{}"), nil
|
||||
}),
|
||||
|
|
@ -409,11 +430,19 @@ func TestEventhubBindingMultiplePartition(t *testing.T) {
|
|||
if err := sim(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
consumerGroup2.FailIfNotExpected(t, string(in.Data))
|
||||
ctx.Logf("Receiving eventhubs message: %s", string(in.Data))
|
||||
return []byte("{}"), nil
|
||||
}))
|
||||
return err
|
||||
}
|
||||
|
||||
deleteEventhub := func(ctx flow.Context) error {
|
||||
output, err := exec.Command("/bin/sh", "deleteeventhub.sh").Output()
|
||||
assert.Nil(t, err, "Error in deleteeventhub.sh.:\n%s", string(output))
|
||||
return nil
|
||||
}
|
||||
|
||||
flow.New(t, "eventhubs binding authentication using connection string all partitions").
|
||||
Step("sleep", flow.Sleep(10*time.Second)).
|
||||
Step(app.Run("app", fmt.Sprintf(":%d", appPort), application)).
|
||||
|
|
@ -426,7 +455,7 @@ func TestEventhubBindingMultiplePartition(t *testing.T) {
|
|||
runtime.WithOutputBindings(out_component),
|
||||
runtime.WithInputBindings(in_component),
|
||||
)).
|
||||
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "443", "5671", "5672")).
|
||||
Step("send and wait", sendAndReceive).
|
||||
Step("send and wait", sendAndReceive(metadata0, metadata1)).
|
||||
Step("delete containers", deleteEventhub).
|
||||
Run()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,12 +14,8 @@
|
|||
|
||||
set -e
|
||||
|
||||
if [[ -z "${IOT_HUB_NAME}" ]]; then
|
||||
echo "ERROR: IOT_HUB_NAME environment variable not defined."
|
||||
exit 1
|
||||
fi
|
||||
if [[ -z "${AZURE_CREDENTIALS}" ]]; then
|
||||
echo "ERROR: AZURE_CREDENTIALS environment variable not defined."
|
||||
if [[ -z "${AzureIotHubName}" ]]; then
|
||||
echo "ERROR: AzureIotHubName environment variable not defined."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
|
@ -29,18 +25,14 @@ fi
|
|||
az config set extension.use_dynamic_install=yes_without_prompt
|
||||
|
||||
# Log in to Azure using provided Service Principal (SP) credentials
|
||||
# The provided SP must have Contributor role access to the IoT Hub specified by IOT_HUB_NAME
|
||||
SDK_AUTH_SP_APPID="$(echo "${AZURE_CREDENTIALS}" | grep 'clientId' | sed -E 's/(.*clientId\"\: \")|\",//g')"
|
||||
SDK_AUTH_SP_CLIENT_SECRET="$(echo "${AZURE_CREDENTIALS}" | grep 'clientSecret' | sed -E 's/(.*clientSecret\"\: \")|\",//g')"
|
||||
SDK_AUTH_SP_TENANT="$(echo "${AZURE_CREDENTIALS}" | grep 'tenantId' | sed -E 's/(.*tenantId\"\: \")|\",//g')"
|
||||
az login --service-principal -u ${SDK_AUTH_SP_APPID} -p ${SDK_AUTH_SP_CLIENT_SECRET} --tenant ${SDK_AUTH_SP_TENANT}
|
||||
az login --service-principal -u $AzureCertificationServicePrincipalClientId -p $AzureCertificationServicePrincipalClientSecret --tenant $AzureCertificationTenantId
|
||||
|
||||
# Create test device ID if not already present
|
||||
IOT_HUB_TEST_DEVICE_NAME="test-device"
|
||||
if [[ -z "$(az iot hub device-identity show -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME})" ]]; then
|
||||
az iot hub device-identity create -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME}
|
||||
az iot hub device-identity create -n ${AzureIotHubName} -d ${IOT_HUB_TEST_DEVICE_NAME}
|
||||
sleep 5
|
||||
fi
|
||||
|
||||
# Send the test IoT device messages to the IoT Hub
|
||||
az iot device simulate -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME} --data '{ "data": "Integration test message" }' --msg-count 2 --msg-interval 1 --protocol http --properties "iothub-userid=dapr-user-id;iothub-messageid=dapr-message-id"
|
||||
az iot device simulate -n ${AzureIotHubName} -d ${IOT_HUB_TEST_DEVICE_NAME} --data '{ "data": "Integration test message" }' --msg-count 2 --msg-interval 1 --protocol http --properties "iothub-userid=dapr-user-id;iothub-messageid=dapr-message-id"
|
||||
|
|
|
|||
Loading…
Reference in New Issue