components-contrib/pubsub/kubemq/kubemq_eventstore_test.go

229 lines
5.1 KiB
Go

package kubemq
import (
"context"
"fmt"
"testing"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
type kubemqEventsStoreMock struct {
resultError error
subscribeErr error
resultCh chan error
publishError error
publishTimeout time.Duration
}
func (k *kubemqEventsStoreMock) publish(msg *kubemq.EventStore) error {
if k.publishError != nil {
return k.publishError
}
go func() {
if k.publishTimeout > 0 {
time.Sleep(k.publishTimeout)
}
k.resultCh <- k.resultError
}()
return nil
}
func (k *kubemqEventsStoreMock) Stream(ctx context.Context, onResult func(result *kubemq.EventStoreResult, err error)) (func(msg *kubemq.EventStore) error, error) {
go func() {
for {
select {
case <-ctx.Done():
return
case result := <-k.resultCh:
onResult(&kubemq.EventStoreResult{
Id: "",
Sent: false,
Err: result,
}, nil)
}
}
}()
return k.publish, nil
}
func (k *kubemqEventsStoreMock) Subscribe(ctx context.Context, request *kubemq.EventsStoreSubscription, onEvent func(msg *kubemq.EventStoreReceive, err error)) error {
return k.subscribeErr
}
func (k *kubemqEventsStoreMock) Close() error {
return nil
}
func (k *kubemqEventsStoreMock) setResultError(err error) *kubemqEventsStoreMock {
k.resultError = err
return k
}
func (k *kubemqEventsStoreMock) setSubscribeError(err error) *kubemqEventsStoreMock {
k.subscribeErr = err
return k
}
func (k *kubemqEventsStoreMock) setPublishTimeout(timeout time.Duration) *kubemqEventsStoreMock {
k.publishTimeout = timeout
return k
}
func (k *kubemqEventsStoreMock) setPublishError(err error) *kubemqEventsStoreMock {
k.publishError = err
return k
}
func newKubemqEventsStoreMock() *kubemqEventsStoreMock {
return &kubemqEventsStoreMock{
resultError: nil,
subscribeErr: nil,
resultCh: make(chan error, 1),
}
}
func Test_kubeMQEventsStore_Publish(t *testing.T) {
tests := []struct {
name string
req *pubsub.PublishRequest
timeout time.Duration
publishErr error
resultError error
wantErr bool
}{
{
name: "publish with no error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
Topic: "some-topic",
},
resultError: nil,
wantErr: false,
},
{
name: "publish with error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
Topic: "some-topic",
},
resultError: fmt.Errorf("some error"),
wantErr: true,
},
{
name: "publish with timeout error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
Topic: "some-topic",
},
resultError: nil,
timeout: 3 * time.Second,
wantErr: true,
},
{
name: "publish with publish error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
Topic: "some-topic",
},
resultError: nil,
publishErr: fmt.Errorf("some error"),
wantErr: true,
},
}
for _, tt := range tests {
k := newKubeMQEventsStore(logger.NewLogger("kubemq-test"))
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
client := newKubemqEventsStoreMock().
setResultError(tt.resultError).
setPublishError(tt.publishErr)
k.isInitialized = true
k.metadata = &kubemqMetadata{
internalHost: "",
internalPort: 0,
ClientID: "some-client-id",
AuthToken: "",
Group: "",
IsStore: true,
}
if tt.timeout > 0 {
k.waitForResultTimeout = tt.timeout - 1*time.Second
client.setPublishTimeout(tt.timeout)
}
k.client = client
_ = k.setPublishStream()
err := k.Publish(tt.req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_ = k.Features()
_ = k.Close()
}
}
func Test_kubeMQkubeMQEventsStore_Subscribe(t *testing.T) {
tests := []struct {
name string
reqMsg *pubsub.NewMessage
subscribeError error
subscribeHandler pubsub.Handler
wantErr bool
}{
{
name: "subscribe with no error",
reqMsg: &pubsub.NewMessage{
Data: []byte("data"),
Topic: "some-topic",
},
subscribeHandler: func(ctx context.Context, msg *pubsub.NewMessage) error {
return nil
},
subscribeError: nil,
wantErr: false,
}, {
name: "subscribe with error",
reqMsg: &pubsub.NewMessage{
Data: []byte("data"),
Topic: "some-topic",
},
subscribeHandler: func(ctx context.Context, msg *pubsub.NewMessage) error {
return nil
},
subscribeError: fmt.Errorf("some error"),
wantErr: true,
},
}
for _, tt := range tests {
k := newKubeMQEventsStore(logger.NewLogger("kubemq-test"))
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
k.client = newKubemqEventsStoreMock().
setSubscribeError(tt.subscribeError)
k.isInitialized = true
k.metadata = &kubemqMetadata{
internalHost: "",
internalPort: 0,
ClientID: "some-client-id",
AuthToken: "",
Group: "",
IsStore: true,
}
err := k.Subscribe(k.ctx, pubsub.SubscribeRequest{Topic: "some-topic"}, tt.subscribeHandler)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_ = k.Features()
_ = k.Close()
}
}