components-contrib/bindings/kubemq/kubemq_integration_test.go

195 lines
5.1 KiB
Go

//go:build integration_test
// +build integration_test
package kubemq
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
const (
// Environment variable containing the host name for KubeMQ integration tests
// To run using docker: docker run -d --hostname -kubemq --name test-kubemq -p 50000:50000 kubemq/kubemq-community:latest
// In that case the address string will be: "localhost:50000"
testKubeMQHostEnvKey = "DAPR_TEST_KUBEMQ_HOST"
)
func getTestKubeMQHost() string {
host := os.Getenv(testKubeMQHostEnvKey)
if host == "" {
host = "localhost:50000"
}
return host
}
func getDefaultMetadata(channel string) bindings.Metadata {
return bindings.Metadata{
Base: metadata.Base{
Name: "kubemq",
Properties: map[string]string{
"address": getTestKubeMQHost(),
"channel": channel,
"pollMaxItems": "1",
"autoAcknowledged": "true",
"pollTimeoutSeconds": "2",
},
},
}
}
func Test_kubeMQ_Init(t *testing.T) {
tests := []struct {
name string
meta bindings.Metadata
wantErr bool
}{
{
name: "init with valid options",
meta: bindings.Metadata{
Base: metadata.Base{
Name: "kubemq",
Properties: map[string]string{
"address": getTestKubeMQHost(),
"channel": "test",
"pollMaxItems": "1",
"autoAcknowledged": "true",
"pollTimeoutSeconds": "2",
},
},
},
wantErr: false,
},
{
name: "init with invalid options",
meta: bindings.Metadata{
Base: metadata.Base{
Name: "kubemq",
Properties: map[string]string{
"address": "localhost-bad:50000",
"channel": "test",
"pollMaxItems": "1",
"autoAcknowledged": "true",
"pollTimeoutSeconds": "2",
},
},
},
wantErr: true,
},
{
name: "init with invalid parsing options",
meta: bindings.Metadata{
Base: metadata.Base{
Name: "kubemq",
Properties: map[string]string{
"address": "bad",
"channel": "test",
"pollMaxItems": "1",
"autoAcknowledged": "true",
"pollTimeoutSeconds": "2",
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kubemq := NewKubeMQ(logger.NewLogger("test"))
err := kubemq.Init(context.Background(), tt.meta)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func Test_kubeMQ_Invoke_Read_Single_Message(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
kubemq := NewKubeMQ(logger.NewLogger("test"))
err := kubemq.Init(context.Background(), getDefaultMetadata("test.read.single"))
require.NoError(t, err)
dataReadCh := make(chan []byte)
invokeRequest := &bindings.InvokeRequest{
Data: []byte("test"),
Metadata: map[string]string{},
}
_, err = kubemq.Invoke(ctx, invokeRequest)
require.NoError(t, err)
_ = kubemq.Read(ctx, func(ctx context.Context, req *bindings.ReadResponse) ([]byte, error) {
dataReadCh <- req.Data
return req.Data, nil
})
select {
case <-ctx.Done():
require.Fail(t, "timeout waiting for read response")
case data := <-dataReadCh:
require.Equal(t, invokeRequest.Data, data)
}
}
func Test_kubeMQ_Invoke_Read_Single_MessageWithHandlerError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
kubemq := NewKubeMQ(logger.NewLogger("test"))
md := getDefaultMetadata("test.read.single.error")
md.Properties["autoAcknowledged"] = "false"
err := kubemq.Init(context.Background(), md)
require.NoError(t, err)
invokeRequest := &bindings.InvokeRequest{
Data: []byte("test"),
Metadata: map[string]string{},
}
_, err = kubemq.Invoke(ctx, invokeRequest)
require.NoError(t, err)
firstReadCtx, firstReadCancel := context.WithTimeout(context.Background(), time.Second*3)
defer firstReadCancel()
_ = kubemq.Read(firstReadCtx, func(ctx context.Context, req *bindings.ReadResponse) ([]byte, error) {
return nil, fmt.Errorf("handler error")
})
<-firstReadCtx.Done()
dataReadCh := make(chan []byte)
secondReadCtx, secondReadCancel := context.WithTimeout(context.Background(), time.Second*3)
defer secondReadCancel()
_ = kubemq.Read(secondReadCtx, func(ctx context.Context, req *bindings.ReadResponse) ([]byte, error) {
dataReadCh <- req.Data
return req.Data, nil
})
select {
case <-secondReadCtx.Done():
require.Fail(t, "timeout waiting for read response")
case data := <-dataReadCh:
require.Equal(t, invokeRequest.Data, data)
}
}
func Test_kubeMQ_Invoke_Error(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
kubemq := NewKubeMQ(logger.NewLogger("test"))
err := kubemq.Init(context.Background(), getDefaultMetadata("***test***"))
require.NoError(t, err)
invokeRequest := &bindings.InvokeRequest{
Data: []byte("test"),
Metadata: map[string]string{},
}
_, err = kubemq.Invoke(ctx, invokeRequest)
require.Error(t, err)
}