Merge pull request #2286 from berndverst/asqbindingvisibilitytimeout

ASQ Binding: Cert Test: Add queue visibilityTimeout test
This commit is contained in:
Bernd Verst 2022-11-16 15:43:58 -08:00 committed by GitHub
commit 592ff74f75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 22 deletions

View File

@ -0,0 +1,23 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: visibilityBinding
namespace: default
spec:
type: bindings.azure.storagequeues
version: v1
metadata:
- name: storageAccount
secretKeyRef:
name: AzureBlobStorageAccount
key: AzureBlobStorageAccount
- name: storageAccessKey
secretKeyRef:
name: AzureBlobStorageAccessKey
key: AzureBlobStorageAccessKey
- name: queue
value: "visibility"
- name: visibilityTimeout
value: "20s"
auth:
secretStore: envvar-secret-store

View File

@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: envvar-secret-store
namespace: default
spec:
type: secretstores.local.env
version: v1
metadata:

View File

@ -17,6 +17,7 @@ package storagequeue_test
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -26,17 +27,6 @@ import (
"github.com/dapr/components-contrib/bindings"
binding_asq "github.com/dapr/components-contrib/bindings/azure/storagequeues"
secretstore_env "github.com/dapr/components-contrib/secretstores/local/env"
bindings_loader "github.com/dapr/dapr/pkg/components/bindings"
secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
daprClient "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/kit/logger"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
@ -44,6 +34,13 @@ import (
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
bindings_loader "github.com/dapr/dapr/pkg/components/bindings"
secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
daprClient "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/kit/logger"
)
const (
@ -131,6 +128,7 @@ func TestStorageQueue(t *testing.T) {
componentRuntimeOptions(),
)).
Step("send and wait", test).
Step("wait for messages to be deleted", flow.Sleep(time.Second*3)).
Run()
}
@ -169,9 +167,6 @@ func TestAzureStorageQueueTTLs(t *testing.T) {
err = client.InvokeOutputBinding(ctx, mixedTTLReq)
require.NoError(ctx, err, "error publishing message")
}
// Wait for double the TTL after sending the last message.
time.Sleep(time.Second * 20)
return nil
}
@ -200,10 +195,8 @@ func TestAzureStorageQueueTTLs(t *testing.T) {
freshPorts, _ := dapr_testing.GetFreePorts(2)
flow.New(t, "storagequeue ttl certification").
// Run the application logic above.
Step(app.Run("ttlApp", fmt.Sprintf(":%d", appPort), ttlApplication)).
Step(sidecar.Run("ttlSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/ttl"),
@ -211,6 +204,7 @@ func TestAzureStorageQueueTTLs(t *testing.T) {
)).
Step("send ttl messages", ttlTest).
Step("stop initial sidecar", sidecar.Stop("ttlSidecar")).
Step("wait for messages to expire", flow.Sleep(time.Second*20)).
Step(app.Run("ttlApp", fmt.Sprintf(":%d", appPort), ttlApplication)).
Step(sidecar.Run("appSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
@ -223,6 +217,7 @@ func TestAzureStorageQueueTTLs(t *testing.T) {
ttlMessages.Assert(t, time.Minute)
return nil
}).
Step("wait for messages to be deleted", flow.Sleep(time.Second*3)).
Run()
}
@ -250,9 +245,6 @@ func TestAzureStorageQueueTTLsWithLessSleepTime(t *testing.T) {
err = client.InvokeOutputBinding(ctx, messageTTLReq)
require.NoError(ctx, err, "error publishing message")
}
// Wait for double the TTL after sending the last message.
time.Sleep(time.Second * 1)
return nil
}
@ -280,6 +272,7 @@ func TestAzureStorageQueueTTLsWithLessSleepTime(t *testing.T) {
componentRuntimeOptions(),
)).
Step("send ttl messages", ttlTest).
Step("wait a brief moment - messages will not have expired", flow.Sleep(time.Second*1)).
Step("stop initial sidecar", sidecar.Stop("ttlSidecar")).
Step(app.Run("ttlApp", fmt.Sprintf(":%d", appPort), ttlApplication)).
Step(sidecar.Run("appSidecar",
@ -293,6 +286,7 @@ func TestAzureStorageQueueTTLsWithLessSleepTime(t *testing.T) {
ttlMessages.Assert(t, time.Minute)
return nil
}).
Step("wait for messages to be deleted", flow.Sleep(time.Second*3)).
Run()
}
@ -354,6 +348,92 @@ func TestAzureStorageQueueForDecode(t *testing.T) {
componentRuntimeOptions(),
)).
Step("send and wait", testDecode).
Step("wait for messages to be deleted", flow.Sleep(time.Second*3)).
Run()
}
func TestAzureStorageQueueForVisibility(t *testing.T) {
allmessages := watcher.NewOrdered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
messageRetryMap := make(map[string]bool)
testVisibility := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
numMessages := 3
// Declare the expected data.
msgs := make([]string, numMessages)
for i := 0; i < 3; i++ {
msgs[i] = fmt.Sprintf("Message %d", i)
}
retryMessages := make([]string, numMessages)
for i := 0; i < 3; i++ {
retryMessages[i] = fmt.Sprintf("Retry Message %d", i)
}
// combine retryMessages and msgs
combineMessages := make([]string, 0)
combineMessages = append(combineMessages, msgs...)
combineMessages = append(combineMessages, retryMessages...)
allmessages.ExpectStrings(combineMessages...)
metadata := make(map[string]string)
ctx.Log("Invoking output binding!")
for i := 0; i < numMessages; i++ {
dataBytes := []byte(msgs[i])
req := &daprClient.InvokeBindingRequest{Name: "visibilityBinding", Operation: "create", Data: dataBytes, Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
// we alternate between message we will accept and message we will intentionally reject / fail just once for visibility testing
dataBytes = []byte(retryMessages[i])
req = &daprClient.InvokeBindingRequest{Name: "visibilityBinding", Operation: "create", Data: dataBytes, Metadata: metadata}
err = client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// check that we eventually got all messages
// this verifies that the retried messages were delivered after the other messages
allmessages.Assert(ctx, time.Second*60)
return nil
}
visibilityApplication := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("visibilityBinding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Reading message: %s", string(in.Data))
if strings.HasPrefix(string(in.Data), "Retry") && !messageRetryMap[string(in.Data)] {
ctx.Logf("Intentionally retrying message: %s", string(in.Data))
messageRetryMap[string(in.Data)] = true
return nil, fmt.Errorf("retry")
}
allmessages.Observe(string(in.Data))
ctx.Logf("Successfully handled message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "storagequeue visibilityTimeout certification").
// Run the application logic above.
Step(app.Run("standardApp", fmt.Sprintf(":%d", appPort), visibilityApplication)).
Step(sidecar.Run("standardSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/visibilityTimeout"),
componentRuntimeOptions(),
)).
Step("send and wait", testVisibility).
Step("wait for messages to be deleted", flow.Sleep(time.Second*3)).
Run()
}
@ -426,6 +506,7 @@ func TestAzureStorageQueueRetriesOnError(t *testing.T) {
)).
Step("interrupt network", network.InterruptNetwork(time.Minute, []string{}, []string{}, "443")).
Step("send and wait", testRetry).
Step("wait for messages to be deleted", flow.Sleep(time.Second*3)).
Run()
}

View File

@ -15,6 +15,7 @@ package watcher
import (
"errors"
"fmt"
"sync"
"time"
@ -344,8 +345,10 @@ func (w *Watcher) FailIfNotExpected(t TestingT, data ...interface{}) {
defer w.mu.Unlock()
for _, item := range data {
val, ok := w.remaining[item]
assert.False(t, ok, "Encountered an unexpected item: %v", val)
_, ok := w.remaining[item]
if !ok {
assert.Fail(t, fmt.Sprintf("Encountered an unexpected item: %v", item), item)
}
}
}