195 lines
5.1 KiB
Go
195 lines
5.1 KiB
Go
//go:build integration_test
|
|
// +build integration_test
|
|
|
|
package kubemq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/dapr/components-contrib/bindings"
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
const (
|
|
// Environment variable containing the host name for KubeMQ integration tests
|
|
// To run using docker: docker run -d --hostname -kubemq --name test-kubemq -p 50000:50000 kubemq/kubemq-community:latest
|
|
// In that case the address string will be: "localhost:50000"
|
|
testKubeMQHostEnvKey = "DAPR_TEST_KUBEMQ_HOST"
|
|
)
|
|
|
|
func getTestKubeMQHost() string {
|
|
host := os.Getenv(testKubeMQHostEnvKey)
|
|
if host == "" {
|
|
host = "localhost:50000"
|
|
}
|
|
return host
|
|
}
|
|
|
|
func getDefaultMetadata(channel string) bindings.Metadata {
|
|
return bindings.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "kubemq",
|
|
Properties: map[string]string{
|
|
"address": getTestKubeMQHost(),
|
|
"channel": channel,
|
|
"pollMaxItems": "1",
|
|
"autoAcknowledged": "true",
|
|
"pollTimeoutSeconds": "2",
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func Test_kubeMQ_Init(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
meta bindings.Metadata
|
|
wantErr bool
|
|
}{
|
|
{
|
|
name: "init with valid options",
|
|
meta: bindings.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "kubemq",
|
|
Properties: map[string]string{
|
|
"address": getTestKubeMQHost(),
|
|
"channel": "test",
|
|
"pollMaxItems": "1",
|
|
"autoAcknowledged": "true",
|
|
"pollTimeoutSeconds": "2",
|
|
},
|
|
},
|
|
},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "init with invalid options",
|
|
meta: bindings.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "kubemq",
|
|
Properties: map[string]string{
|
|
"address": "localhost-bad:50000",
|
|
"channel": "test",
|
|
"pollMaxItems": "1",
|
|
"autoAcknowledged": "true",
|
|
"pollTimeoutSeconds": "2",
|
|
},
|
|
},
|
|
},
|
|
wantErr: true,
|
|
},
|
|
{
|
|
name: "init with invalid parsing options",
|
|
meta: bindings.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "kubemq",
|
|
Properties: map[string]string{
|
|
"address": "bad",
|
|
"channel": "test",
|
|
"pollMaxItems": "1",
|
|
"autoAcknowledged": "true",
|
|
"pollTimeoutSeconds": "2",
|
|
},
|
|
},
|
|
},
|
|
wantErr: true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
kubemq := NewKubeMQ(logger.NewLogger("test"))
|
|
err := kubemq.Init(context.Background(), tt.meta)
|
|
if tt.wantErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_kubeMQ_Invoke_Read_Single_Message(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
kubemq := NewKubeMQ(logger.NewLogger("test"))
|
|
err := kubemq.Init(context.Background(), getDefaultMetadata("test.read.single"))
|
|
require.NoError(t, err)
|
|
dataReadCh := make(chan []byte)
|
|
invokeRequest := &bindings.InvokeRequest{
|
|
Data: []byte("test"),
|
|
Metadata: map[string]string{},
|
|
}
|
|
_, err = kubemq.Invoke(ctx, invokeRequest)
|
|
require.NoError(t, err)
|
|
_ = kubemq.Read(ctx, func(ctx context.Context, req *bindings.ReadResponse) ([]byte, error) {
|
|
dataReadCh <- req.Data
|
|
return req.Data, nil
|
|
})
|
|
select {
|
|
case <-ctx.Done():
|
|
require.Fail(t, "timeout waiting for read response")
|
|
case data := <-dataReadCh:
|
|
require.Equal(t, invokeRequest.Data, data)
|
|
}
|
|
}
|
|
|
|
func Test_kubeMQ_Invoke_Read_Single_MessageWithHandlerError(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer cancel()
|
|
kubemq := NewKubeMQ(logger.NewLogger("test"))
|
|
md := getDefaultMetadata("test.read.single.error")
|
|
md.Properties["autoAcknowledged"] = "false"
|
|
err := kubemq.Init(context.Background(), md)
|
|
require.NoError(t, err)
|
|
invokeRequest := &bindings.InvokeRequest{
|
|
Data: []byte("test"),
|
|
Metadata: map[string]string{},
|
|
}
|
|
|
|
_, err = kubemq.Invoke(ctx, invokeRequest)
|
|
require.NoError(t, err)
|
|
firstReadCtx, firstReadCancel := context.WithTimeout(context.Background(), time.Second*3)
|
|
defer firstReadCancel()
|
|
_ = kubemq.Read(firstReadCtx, func(ctx context.Context, req *bindings.ReadResponse) ([]byte, error) {
|
|
return nil, fmt.Errorf("handler error")
|
|
})
|
|
|
|
<-firstReadCtx.Done()
|
|
dataReadCh := make(chan []byte)
|
|
secondReadCtx, secondReadCancel := context.WithTimeout(context.Background(), time.Second*3)
|
|
defer secondReadCancel()
|
|
_ = kubemq.Read(secondReadCtx, func(ctx context.Context, req *bindings.ReadResponse) ([]byte, error) {
|
|
dataReadCh <- req.Data
|
|
return req.Data, nil
|
|
})
|
|
select {
|
|
case <-secondReadCtx.Done():
|
|
require.Fail(t, "timeout waiting for read response")
|
|
case data := <-dataReadCh:
|
|
require.Equal(t, invokeRequest.Data, data)
|
|
}
|
|
}
|
|
|
|
func Test_kubeMQ_Invoke_Error(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
kubemq := NewKubeMQ(logger.NewLogger("test"))
|
|
err := kubemq.Init(context.Background(), getDefaultMetadata("***test***"))
|
|
require.NoError(t, err)
|
|
|
|
invokeRequest := &bindings.InvokeRequest{
|
|
Data: []byte("test"),
|
|
Metadata: map[string]string{},
|
|
}
|
|
_, err = kubemq.Invoke(ctx, invokeRequest)
|
|
require.Error(t, err)
|
|
}
|