Add certification tests for ASB Queue binding (#1251)

* Add certification tests for ASB Queue binding

This commit adds certification tests for the Azure Service Bus Queue
Input/Output binding. This utilizes the new certification framework
and performs a series of tests as described in the test plan readme.

The general purpose of these tests is to serve as an integration test
and as such requires an actual Azure Service Bus connection and a
Dapr sidecar.

https://github.com/dapr/components-contrib/issues/957

* Add to github workflow and fix formatting

* Fix dependencies

Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
This commit is contained in:
halspang 2021-12-06 14:20:53 -05:00 committed by GitHub
parent 4918900c09
commit 3395bb7316
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2444 additions and 0 deletions

View File

@ -80,6 +80,8 @@ jobs:
required-certs: AzureKeyVaultSecretStoreCert
- component: state.sqlserver
required-secrets: AzureSqlServerConnectionString
- component: bindings.azure.servicebusqueues
required-secrets: AzureServiceBusConnectionString
EOF
)
echo "::set-output name=cloud-components::$CRON_COMPONENTS"

View File

@ -0,0 +1,39 @@
# Azure Service Bus Queue Input/Output Binding Certification
The purpose of this module is to provide tests that certify the Azure Service Bus Queue Input/Output Binding as a stable component.
## Test Plan
### Certification Tests
- Verify Queue level TTL is respected
- Create a component spec with the field `ttlInSeconds`
- Run dapr application with component
- Ensure a queue was created in Azure Service Bus Namespace with TTL set
- Send a message, wait TTL seconds, and verify the message is deleted
- Verify Message level TTL is respected
- Create a component spec without the field `ttlInSeconds`
- Run dapr application with component
- Ensure a queue was created in Azure Service Bus Namespace with TTL set to default (very large)
- Send a message with `ttlInSeconds` metadata set, wait TTL seconds, and verify the message is deleted
- Verify Queue/Message TTL interaction
- Create a component spec with the field `ttlInSeconds`
- Run dapr application with component
- Ensure a queue was created in Azure Service Bus Namespace with TTL set
- Send a message with `ttlInSeconds` and ensure message field is respected over queue field
- Verify create and receive accuracy
- Create multiple input/output bindings with different queues
- Run dapr application with components
- Send/receive messages across queues, ensure target/receiver is always correct
- Additionally, ensure that the messages are in order
- Verify Network Resiliency
- During the create and receive accuracy test, simulate a network error before sending any messages
- After recovery send messages to the queue
- Ensure that all messages are received
- Verify App Failure Resiliency
- Start an application that is guaranteed to fail
- Ensure the binding continues to read incoming messages
- Ensure the messages that are failed are retried
### Future Tests
1. Provide iterations around the different auth mechanisms supported by Azure Service Bus.
1. Utilize a connection string (covered).
1. Utilize a service principal with the appropriate roles granted.
1. Utilize Managed Identity.

View File

@ -0,0 +1,19 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: retry-binding
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureServiceBusConnectionString
key: AzureServiceBusConnectionString
- name: queueName
value: retryqueue
- name: ttlInSeconds
value: 300 # ttl is just for cleanup in case the test goes bad.
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,19 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sb-binding-1
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureServiceBusConnectionString
key: AzureServiceBusConnectionString
- name: queueName
value: certqueue1
- name: ttlInSeconds
value: 300 # ttl is just for cleanup in case the test goes bad.
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,19 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sb-binding-2
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureServiceBusConnectionString
key: AzureServiceBusConnectionString
- name: queueName
value: certqueue2
- name: ttlInSeconds
value: 300 # ttl is just for cleanup in case the test goes bad.
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,17 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagettl
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureServiceBusConnectionString
key: AzureServiceBusConnectionString
- name: queueName
value: messagettlqueue
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,19 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mixedttl
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureServiceBusConnectionString
key: AzureServiceBusConnectionString
- name: queueName
value: mixedttlqueue
- name: ttlInSeconds
value: 300 # Long ttl to allow message expiration first.
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,19 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: queuettl
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: AzureServiceBusConnectionString
key: AzureServiceBusConnectionString
- name: queueName
value: queuettlqueue
- name: ttlInSeconds
value: 10 # Short TTL for easier testing
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -0,0 +1,6 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprConfig
spec:
features:

View File

@ -0,0 +1,122 @@
module servicebusqueue_test
go 1.17
require (
github.com/dapr/components-contrib v1.5.0
github.com/dapr/components-contrib/tests/certification v0.0.0-20211026011813-36b75e9ae272
github.com/dapr/dapr v1.5.0
github.com/dapr/go-sdk v1.3.0
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
github.com/stretchr/testify v1.7.0
go.uber.org/multierr v1.7.0
)
require (
contrib.go.opencensus.io/exporter/prometheus v0.2.0 // indirect
contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.2.0 // indirect
github.com/Azure/azure-service-bus-go v0.11.1 // indirect
github.com/Azure/go-amqp v0.13.13 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.21 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.16 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.0.2 // indirect
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/devigned/tab v0.1.1 // indirect
github.com/fasthttp/router v1.3.8 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-logr/logr v0.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/cel-go v0.7.3 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/hashicorp/consul/api v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.8.2 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/klauspost/compress v1.12.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.35 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/openzipkin/zipkin-go v0.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.22.3 // indirect
github.com/savsgio/gotils v0.0.0-20210217112953-d4a072536008 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/tylertreat/comcast v1.0.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.28.0 // indirect
go.opencensus.io v0.22.5 // indirect
go.opentelemetry.io/otel v0.19.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20210524171403-669157292da3 // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.20.0 // indirect
k8s.io/apiextensions-apiserver v0.20.0 // indirect
k8s.io/apimachinery v0.20.0 // indirect
k8s.io/client-go v0.20.0 // indirect
k8s.io/klog/v2 v2.4.0 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
nhooyr.io/websocket v1.8.6 // indirect
sigs.k8s.io/controller-runtime v0.7.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
replace github.com/dapr/components-contrib => ../../../../..
replace github.com/dapr/components-contrib/tests/certification => ../../..

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,348 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------
package servicebusqueue_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"github.com/dapr/components-contrib/bindings"
binding_asb "github.com/dapr/components-contrib/bindings/azure/servicebusqueues"
"github.com/dapr/components-contrib/secretstores"
secretstore_env "github.com/dapr/components-contrib/secretstores/local/env"
binding_loader "github.com/dapr/dapr/pkg/components/bindings"
secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
daprClient "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/kit/logger"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
)
const (
numMessages = 100
)
func TestServiceBusQueue(t *testing.T) {
log := logger.NewLogger("dapr.components")
messagesFor1 := watcher.NewOrdered()
messagesFor2 := watcher.NewOrdered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgsFor1 := make([]string, numMessages/2)
msgsFor2 := make([]string, numMessages/2)
for i := 0; i < numMessages/2; i++ {
msgsFor1[i] = fmt.Sprintf("sb-binding-1: Message %03d", i)
}
for i := numMessages / 2; i < numMessages; i++ {
msgsFor2[i-(numMessages/2)] = fmt.Sprintf("sb-binding-2: Message %03d", i)
}
messagesFor1.ExpectStrings(msgsFor1...)
messagesFor2.ExpectStrings(msgsFor2...)
// Send events that the application above will observe.
ctx.Log("Invoking binding 1!")
for _, msg := range msgsFor1 {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "sb-binding-1", Operation: "create", Data: []byte(msg)}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
ctx.Log("Invoking binding 2!")
for _, msg := range msgsFor2 {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "sb-binding-2", Operation: "create", Data: []byte(msg)}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Do the messages we observed match what we expect?
messagesFor1.Assert(ctx, time.Minute)
messagesFor2.Assert(ctx, time.Minute)
return nil
}
// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messagesFor1.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("sb-binding-2", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messagesFor2.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "servicebusqueue certification").
// Run the application logic above.
Step(app.Run("basicApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("basicSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/standard"),
runtime.WithOutputBindings(
binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithInputBindings(
binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithSecretStores(
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(log)
}),
))).
// Block the standard AMPQ ports.
Step("interrupt network", network.InterruptNetwork(time.Minute, []string{}, []string{}, "5671", "5672")).
Step("send and wait", test).
Run()
}
func TestAzureServiceBusQueuesTTLs(t *testing.T) {
log := logger.NewLogger("dapr.components")
ttlMessages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
sendTTLMessages := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
ctx.Logf("Sending messages for expiration.")
for i := 0; i < numMessages; i++ {
msg := fmt.Sprintf("Expiring message %d", i)
metadata := make(map[string]string)
// Send to the queue with TTL.
queueTTLReq := &daprClient.InvokeBindingRequest{Name: "queuettl", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, queueTTLReq)
require.NoError(ctx, err, "error publishing message")
// Send message with TTL.
messageTTLReq := &daprClient.InvokeBindingRequest{Name: "messagettl", Operation: "create", Data: []byte(msg), Metadata: metadata}
messageTTLReq.Metadata["ttlInSeconds"] = "10"
err = client.InvokeOutputBinding(ctx, messageTTLReq)
require.NoError(ctx, err, "error publishing message")
// Send message with TTL to ensure it overwrites Queue TTL.
mixedTTLReq := &daprClient.InvokeBindingRequest{Name: "mixedttl", Operation: "create", Data: []byte(msg), Metadata: metadata}
mixedTTLReq.Metadata["ttlInSeconds"] = "10"
err = client.InvokeOutputBinding(ctx, mixedTTLReq)
require.NoError(ctx, err, "error publishing message")
}
// Wait for double the TTL after sending the last message.
time.Sleep(time.Second * 20)
return nil
}
ttlApplication := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("queuettl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Oh no! Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("messagettl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Oh no! Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("mixedttl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Oh no! Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}))
return err
}
freshPorts, _ := dapr_testing.GetFreePorts(2)
flow.New(t, "servicebusqueue ttl certification").
Step(sidecar.Run("ttlSidecar",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/ttl"),
runtime.WithOutputBindings(
binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithInputBindings(
binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithSecretStores(
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(log)
}),
))).
Step("send ttl messages", sendTTLMessages).
Step("stop initial sidecar", sidecar.Stop("ttlSidecar")).
Step(app.Run("ttlApp", fmt.Sprintf(":%d", appPort), ttlApplication)).
Step(sidecar.Run("appSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(freshPorts[0]),
embedded.WithDaprHTTPPort(freshPorts[1]),
runtime.WithOutputBindings(
binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithInputBindings(
binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithSecretStores(
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(log)
}),
))).
Step("verify no messages", func(ctx flow.Context) error {
ttlMessages.Assert(t, time.Minute)
return nil
}).
Run()
}
func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
log := logger.NewLogger("dapr.components")
messages := watcher.NewOrdered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]string, numMessages/2)
for i := 0; i < numMessages/2; i++ {
msgs[i] = fmt.Sprintf("Message %03d", i)
}
messages.ExpectStrings(msgs...)
// Send events that the application above will observe.
ctx.Log("Invoking binding 1!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "retry-binding", Operation: "create", Data: []byte(msg)}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Do the messages we observed match what we expect?
messages.Assert(ctx, time.Minute)
return nil
}
// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 10)
// Setup the input binding endpoint
err = multierr.Combine(err,
s.AddBindingInvocationHandler("retry-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if err := sim(); err != nil {
ctx.Logf("Failing message: %s", string(in.Data))
return nil, err
}
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "servicebusqueue retry certification").
// Run the application logic above.
Step(app.Run("retryApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("retrySidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/retry"),
runtime.WithOutputBindings(
binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithInputBindings(
binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithSecretStores(
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(log)
}),
))).
Step("send and wait", test).
Run()
}

View File

@ -317,6 +317,16 @@ func (w *Watcher) WaitForResult(timeout time.Duration) error {
return nil
}
func (w *Watcher) FailIfNotExpected(t TestingT, data ...interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
for _, item := range data {
val, ok := w.remaining[item]
assert.False(t, ok, "Encountered an unexpected item: %v", val)
}
}
// Result waits for up to `timeout` for all
// expected data to be observed and returns
// the expected and observed slices.