/* Copyright 2021 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package storagequeues import ( "context" "encoding/base64" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" "github.com/dapr/kit/ptr" ) type MockHelper struct { mock.Mock messages chan []byte metadata *storageQueuesMetadata closeCh chan struct{} wg sync.WaitGroup } func (m *MockHelper) Init(ctx context.Context, metadata bindings.Metadata) (*storageQueuesMetadata, error) { m.messages = make(chan []byte, 10) var err error m.metadata, err = parseMetadata(metadata) return m.metadata, err } func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error { m.messages <- data retvals := m.Called(data, ttl) return retvals.Error(0) } func (m *MockHelper) Read(ctx context.Context, consumer *consumer) error { retvals := m.Called(ctx, consumer) readCtx, cancel := context.WithCancel(ctx) m.wg.Add(2) go func() { defer m.wg.Done() defer cancel() select { case <-readCtx.Done(): case <-m.closeCh: } }() go func() { defer m.wg.Done() for msg := range m.messages { if m.metadata.DecodeBase64 { msg, _ = base64.StdEncoding.DecodeString(string(msg)) } go consumer.callback(readCtx, &bindings.ReadResponse{ Data: msg, }) } }() return retvals.Error(0) } func (m *MockHelper) Close() error { defer m.wg.Wait() close(m.closeCh) return nil } func TestWriteQueue(t *testing.T) { mm := new(MockHelper) mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool { return in == nil })).Return(nil) a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{Data: []byte("This is my message")} _, err = a.Invoke(context.Background(), &r) require.NoError(t, err) assert.NoError(t, a.Close()) } func TestWriteWithTTLInQueue(t *testing.T) { mm := new(MockHelper) mm.On("Write", mock.AnythingOfTypeArgument("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool { return in != nil && *in == time.Second })).Return(nil) a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "1"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{Data: []byte("This is my message")} _, err = a.Invoke(context.Background(), &r) require.NoError(t, err) assert.NoError(t, a.Close()) } func TestWriteWithTTLInWrite(t *testing.T) { mm := new(MockHelper) mm.On("Write", mock.AnythingOfTypeArgument("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool { return in != nil && *in == time.Second })).Return(nil) a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "1"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{ Data: []byte("This is my message"), Metadata: map[string]string{metadata.TTLMetadataKey: "1"}, } _, err = a.Invoke(context.Background(), &r) require.NoError(t, err) assert.NoError(t, a.Close()) } // Uncomment this function to write a message to local storage queue /* func TestWriteLocalQueue(t *testing.T) { a := AzureStorageQueues{helper: &AzureQueueHelper{reqURI: "http://127.0.0.1:10001/%s/%s"}} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{Data: []byte("This is my message")} err = a.Write(&r) require.NoError(t, err) } */ func TestReadQueue(t *testing.T) { mm := new(MockHelper) mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil) mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil) a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{Data: []byte("This is my message")} ctx, cancel := context.WithCancel(context.Background()) _, err = a.Invoke(ctx, &r) require.NoError(t, err) received := 0 handler := func(ctx context.Context, data *bindings.ReadResponse) ([]byte, error) { received++ s := string(data.Data) assert.Equal(t, s, "This is my message") cancel() return nil, nil } a.Read(ctx, handler) select { case <-ctx.Done(): // do nothing case <-time.After(10 * time.Second): cancel() t.Fatal("Timeout waiting for messages") } assert.Equal(t, 1, received) assert.NoError(t, a.Close()) } func TestReadQueueDecode(t *testing.T) { mm := new(MockHelper) mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil) mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil) a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1", "decodeBase64": "true"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{Data: []byte("VGhpcyBpcyBteSBtZXNzYWdl")} ctx, cancel := context.WithCancel(context.Background()) _, err = a.Invoke(ctx, &r) require.NoError(t, err) received := 0 handler := func(ctx context.Context, data *bindings.ReadResponse) ([]byte, error) { received++ s := string(data.Data) assert.Equal(t, s, "This is my message") cancel() return nil, nil } a.Read(ctx, handler) select { case <-ctx.Done(): // do nothing case <-time.After(10 * time.Second): cancel() t.Fatal("Timeout waiting for messages") } assert.Equal(t, 1, received) assert.NoError(t, a.Close()) } // Uncomment this function to test reding from local queue //nolint:godot /* func TestReadLocalQueue(t *testing.T) { a := AzureStorageQueues{helper: &AzureQueueHelper{reqURI: "http://127.0.0.1:10001/%s/%s"}} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"} err := a.Init(context.Background(), m) require.NoError(t, err) r := bindings.InvokeRequest{Data: []byte("This is my message")} err = a.Write(&r) require.NoError(t, err) var handler = func(data *bindings.ReadResponse) ([]byte, error) { s := string(data.Data) assert.Equal(t, s, "This is my message") return nil, nil } _ = a.Read(handler) time.Sleep(30 * time.Second) } */ func TestReadQueueNoMessage(t *testing.T) { mm := new(MockHelper) mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil) mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil) a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})} m := bindings.Metadata{} m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"} err := a.Init(context.Background(), m) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) received := 0 handler := func(ctx context.Context, data *bindings.ReadResponse) ([]byte, error) { received++ s := string(data.Data) assert.Equal(t, s, "This is my message") return nil, nil } a.Read(ctx, handler) time.Sleep(1 * time.Second) cancel() assert.Equal(t, 0, received) assert.NoError(t, a.Close()) } func TestParseMetadata(t *testing.T) { oneSecondDuration := time.Second testCases := []struct { name string properties map[string]string // Account key is parsed in azauth // expectedAccountKey string expectedQueueName string expectedQueueEndpointURL string expectedPollingInterval time.Duration expectedTTL *time.Duration expectedVisibilityTimeout *time.Duration }{ { name: "Account and key", properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1"}, // expectedAccountKey: "myKey", expectedQueueName: "queue1", expectedQueueEndpointURL: "", expectedPollingInterval: defaultPollingInterval, expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout), }, { name: "Accout, key, and endpoint", properties: map[string]string{"accountKey": "myKey", "queueName": "queue1", "storageAccount": "someAccount", "queueEndpointUrl": "https://foo.example.com:10001"}, // expectedAccountKey: "myKey", expectedQueueName: "queue1", expectedQueueEndpointURL: "https://foo.example.com:10001", expectedPollingInterval: defaultPollingInterval, expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout), }, { name: "Empty TTL", properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: ""}, // expectedAccountKey: "myKey", expectedQueueName: "queue1", expectedQueueEndpointURL: "", expectedTTL: ptr.Of(time.Duration(0)), expectedPollingInterval: defaultPollingInterval, expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout), }, { name: "With TTL", properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "1"}, // expectedAccountKey: "myKey", expectedQueueName: "queue1", expectedTTL: &oneSecondDuration, expectedQueueEndpointURL: "", expectedPollingInterval: defaultPollingInterval, expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout), }, { name: "With visibility timeout", properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"}, expectedQueueName: "queue1", expectedPollingInterval: defaultPollingInterval, expectedVisibilityTimeout: ptr.Of(5 * time.Second), }, { name: "With polling interval", properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "pollingInterval": "2s"}, // expectedAccountKey: "myKey", expectedQueueName: "queue1", expectedQueueEndpointURL: "", expectedPollingInterval: 2 * time.Second, expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout), }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { m := bindings.Metadata{} m.Properties = tt.properties meta, err := parseMetadata(m) require.NoError(t, err) // assert.Equal(t, tt.expectedAccountKey, meta.AccountKey) assert.Equal(t, tt.expectedQueueName, meta.QueueName) assert.Equal(t, tt.expectedTTL, meta.TTL) assert.Equal(t, tt.expectedQueueEndpointURL, meta.QueueEndpoint) assert.Equal(t, tt.expectedVisibilityTimeout, meta.VisibilityTimeout) }) } t.Run("invalid pollingInterval", func(t *testing.T) { m := bindings.Metadata{Base: metadata.Base{ Properties: map[string]string{ "accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "pollingInterval": "-1s", }, }} _, err := parseMetadata(m) require.Error(t, err) }) } func TestParseMetadataWithInvalidTTL(t *testing.T) { testCases := []struct { name string properties map[string]string }{ { name: "Whitespaces TTL", properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: " "}, }, { name: "Negative ttl", properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "-1"}, }, { name: "Non-numeric ttl", properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "abc"}, }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { m := bindings.Metadata{} m.Properties = tt.properties _, err := parseMetadata(m) assert.NotNil(t, err) }) } }