diff --git a/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go b/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go index ad7d2af96..40a7e5002 100644 --- a/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go +++ b/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go @@ -14,8 +14,12 @@ limitations under the License. package servicebusqueue_test import ( + "bytes" "context" + "crypto/rand" + "encoding/base64" "fmt" + "io" "testing" "time" @@ -49,6 +53,15 @@ const ( numMessages = 100 ) +var testprefix string + +func init() { + // Generate a random test prefix + rnd := make([]byte, 7) + io.ReadFull(rand.Reader, rnd) + testprefix = base64.RawURLEncoding.EncodeToString(rnd) +} + func TestServiceBusQueue(t *testing.T) { messagesFor1 := watcher.NewOrdered() messagesFor2 := watcher.NewOrdered() @@ -67,11 +80,11 @@ func TestServiceBusQueue(t *testing.T) { msgsFor1 := make([]string, numMessages/2) msgsFor2 := make([]string, numMessages/2) for i := 0; i < numMessages/2; i++ { - msgsFor1[i] = fmt.Sprintf("sb-binding-1: Message %03d", i) + msgsFor1[i] = fmt.Sprintf("%s: sb-binding-1: Message %03d", testprefix, i) } for i := numMessages / 2; i < numMessages; i++ { - msgsFor2[i-(numMessages/2)] = fmt.Sprintf("sb-binding-2: Message %03d", i) + msgsFor2[i-(numMessages/2)] = fmt.Sprintf("%s: sb-binding-2: Message %03d", testprefix, i) } messagesFor1.ExpectStrings(msgsFor1...) @@ -108,11 +121,19 @@ func TestServiceBusQueue(t *testing.T) { // Setup the input binding endpoints err = multierr.Combine(err, s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + messagesFor1.Observe(string(in.Data)) ctx.Logf("Got message: %s", string(in.Data)) return []byte("{}"), nil }), s.AddBindingInvocationHandler("sb-binding-2", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + messagesFor2.Observe(string(in.Data)) ctx.Logf("Got message: %s", string(in.Data)) return []byte("{}"), nil @@ -128,7 +149,7 @@ func TestServiceBusQueue(t *testing.T) { embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), - embedded.WithComponentsPath("./components/standard"), + embedded.WithResourcesPath("./components/standard"), componentRuntimeOptions(), )). // Block the standard AMPQ ports. @@ -151,23 +172,38 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) { ctx.Logf("Sending messages for expiration.") for i := 0; i < numMessages; i++ { - msg := fmt.Sprintf("Expiring message %d", i) + msg := fmt.Sprintf("%s: Expiring message %d", testprefix, i) metadata := make(map[string]string) // Send to the queue with TTL. - queueTTLReq := &daprClient.InvokeBindingRequest{Name: "queuettl", Operation: "create", Data: []byte(msg), Metadata: metadata} + queueTTLReq := &daprClient.InvokeBindingRequest{ + Name: "queuettl", + Operation: "create", + Data: []byte(msg), + Metadata: metadata, + } err := client.InvokeOutputBinding(ctx, queueTTLReq) require.NoError(ctx, err, "error publishing message") // Send message with TTL. - messageTTLReq := &daprClient.InvokeBindingRequest{Name: "messagettl", Operation: "create", Data: []byte(msg), Metadata: metadata} + messageTTLReq := &daprClient.InvokeBindingRequest{ + Name: "messagettl", + Operation: "create", + Data: []byte(msg), + Metadata: metadata, + } messageTTLReq.Metadata["ttlInSeconds"] = "10" err = client.InvokeOutputBinding(ctx, messageTTLReq) require.NoError(ctx, err, "error publishing message") // Send message with TTL to ensure it overwrites Queue TTL. - mixedTTLReq := &daprClient.InvokeBindingRequest{Name: "mixedttl", Operation: "create", Data: []byte(msg), Metadata: metadata} + mixedTTLReq := &daprClient.InvokeBindingRequest{ + Name: "mixedttl", + Operation: "create", + Data: []byte(msg), + Metadata: metadata, + } mixedTTLReq.Metadata["ttlInSeconds"] = "10" err = client.InvokeOutputBinding(ctx, mixedTTLReq) require.NoError(ctx, err, "error publishing message") @@ -182,16 +218,28 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) { // Setup the input binding endpoints err = multierr.Combine(err, s.AddBindingInvocationHandler("queuettl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + ctx.Logf("Oh no! Got message: %s", string(in.Data)) ttlMessages.FailIfNotExpected(t, string(in.Data)) return []byte("{}"), nil }), s.AddBindingInvocationHandler("messagettl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + ctx.Logf("Oh no! Got message: %s", string(in.Data)) ttlMessages.FailIfNotExpected(t, string(in.Data)) return []byte("{}"), nil }), s.AddBindingInvocationHandler("mixedttl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + ctx.Logf("Oh no! Got message: %s", string(in.Data)) ttlMessages.FailIfNotExpected(t, string(in.Data)) return []byte("{}"), nil @@ -207,7 +255,7 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) { embedded.WithoutApp(), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), - embedded.WithComponentsPath("./components/ttl"), + embedded.WithResourcesPath("./components/ttl"), componentRuntimeOptions(), )). Step("send ttl messages", sendTTLMessages). @@ -242,7 +290,7 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) { // that will satisfy the test. msgs := make([]string, numMessages/2) for i := 0; i < numMessages/2; i++ { - msgs[i] = fmt.Sprintf("Message %03d", i) + msgs[i] = fmt.Sprintf("%s: Message %03d", testprefix, i) } messages.ExpectStrings(msgs...) @@ -252,7 +300,11 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) { for _, msg := range msgs { ctx.Logf("Sending: %q", msg) - req := &daprClient.InvokeBindingRequest{Name: "retry-binding", Operation: "create", Data: []byte(msg)} + req := &daprClient.InvokeBindingRequest{ + Name: "retry-binding", + Operation: "create", + Data: []byte(msg), + } err := client.InvokeOutputBinding(ctx, req) require.NoError(ctx, err, "error publishing message") } @@ -271,6 +323,10 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) { // Setup the input binding endpoint err = multierr.Combine(err, s.AddBindingInvocationHandler("retry-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + if err := sim(); err != nil { ctx.Logf("Failing message: %s", string(in.Data)) return nil, err @@ -291,7 +347,7 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) { embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), - embedded.WithComponentsPath("./components/retry"), + embedded.WithResourcesPath("./components/retry"), componentRuntimeOptions(), )). Step("send and wait", test). @@ -312,10 +368,17 @@ func TestServiceBusQueueMetadata(t *testing.T) { // Send events that the application above will observe. ctx.Log("Invoking binding!") - req := &daprClient.InvokeBindingRequest{Name: "sb-binding-1", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"Testmetadata": "Some Metadata"}} + req := &daprClient.InvokeBindingRequest{ + Name: "sb-binding-1", + Operation: "create", + Data: []byte(testprefix + ": test msg"), + Metadata: map[string]string{"Testmetadata": "Some Metadata"}, + } err = client.InvokeOutputBinding(ctx, req) require.NoError(ctx, err, "error publishing message") + messages.ExpectStrings(string(req.Data)) + // Do the messages we observed match what we expect? messages.Assert(ctx, time.Minute) @@ -327,11 +390,15 @@ func TestServiceBusQueueMetadata(t *testing.T) { // Setup the input binding endpoints err = multierr.Combine(err, s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + if !bytes.HasPrefix(in.Data, []byte(testprefix)) { + return []byte("{}"), nil + } + messages.Observe(string(in.Data)) - ctx.Logf("Got message: %s - %+v", string(in.Data), in.Metadata) - require.NotEmpty(t, in.Metadata) - require.Contains(t, in.Metadata, "Testmetadata") - require.Equal(t, "Some Metadata", in.Metadata["Testmetadata"]) + ctx.Logf("Got message: %s - %#v", string(in.Data), in.Metadata) + require.NotEmptyf(t, in.Metadata, "Data: %s - Metadata: %#v", in.Data, in.Metadata) + require.Containsf(t, in.Metadata, "Testmetadata", "Data: %s - Metadata: %#v", in.Data, in.Metadata) + require.Equalf(t, "Some+Metadata", in.Metadata["Testmetadata"], "Data: %s - Metadata: %#v", in.Data, in.Metadata) // + because the message is encoded for HTTP headers return []byte("{}"), nil })) @@ -346,7 +413,7 @@ func TestServiceBusQueueMetadata(t *testing.T) { embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), - embedded.WithComponentsPath("./components/standard"), + embedded.WithResourcesPath("./components/standard"), componentRuntimeOptions(), )). Step("send and wait", test). @@ -364,7 +431,12 @@ func TestServiceBusQueueDisableEntityManagement(t *testing.T) { // Send events that the application above will observe. ctx.Log("Invoking binding!") - req := &daprClient.InvokeBindingRequest{Name: "mgmt-binding", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"TestMetadata": "Some Metadata"}} + req := &daprClient.InvokeBindingRequest{ + Name: "mgmt-binding", + Operation: "create", + Data: []byte(testprefix + ": test msg"), + Metadata: map[string]string{"TestMetadata": "Some Metadata"}, + } err = client.InvokeOutputBinding(ctx, req) require.Error(ctx, err, "error publishing message") return nil @@ -376,7 +448,7 @@ func TestServiceBusQueueDisableEntityManagement(t *testing.T) { embedded.WithoutApp(), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), - embedded.WithComponentsPath("./components/disable_entity_mgmt"), + embedded.WithResourcesPath("./components/disable_entity_mgmt"), componentRuntimeOptions(), )). Step("send and wait", testWithExpectedFailure).