components-contrib/pubsub/kubemq/kubemq_events_test.go

206 lines
4.5 KiB
Go

package kubemq
import (
"context"
"fmt"
"testing"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
type kubemqEventsMock struct {
resultError error
subscribeErr error
resultCh chan error
publishError error
publishTimeout time.Duration
}
func (k *kubemqEventsMock) publish(msg *kubemq.Event) error {
if k.publishError != nil {
return k.publishError
}
go func() {
if k.publishTimeout > 0 {
time.Sleep(k.publishTimeout)
}
k.resultCh <- k.resultError
}()
return nil
}
func (k *kubemqEventsMock) Stream(ctx context.Context, onError func(err error)) (func(msg *kubemq.Event) error, error) {
go func() {
for {
select {
case <-ctx.Done():
return
case result := <-k.resultCh:
onError(result)
}
}
}()
return k.publish, nil
}
func (k *kubemqEventsMock) Subscribe(ctx context.Context, request *kubemq.EventsSubscription, onEvent func(msg *kubemq.Event, err error)) error {
return k.subscribeErr
}
func (k *kubemqEventsMock) Close() error {
return nil
}
func (k *kubemqEventsMock) setResultError(err error) *kubemqEventsMock {
k.resultError = err
return k
}
func (k *kubemqEventsMock) setSubscribeError(err error) *kubemqEventsMock {
k.subscribeErr = err
return k
}
func (k *kubemqEventsMock) setPublishTimeout(timeout time.Duration) *kubemqEventsMock {
k.publishTimeout = timeout
return k
}
func (k *kubemqEventsMock) setPublishError(err error) *kubemqEventsMock {
k.publishError = err
return k
}
func newKubemqEventsMock() *kubemqEventsMock {
return &kubemqEventsMock{
resultError: nil,
subscribeErr: nil,
resultCh: make(chan error, 1),
}
}
func Test_kubeMQEvents_Publish(t *testing.T) {
tests := []struct {
name string
req *pubsub.PublishRequest
timeout time.Duration
publishErr error
resultError error
wantErr bool
}{
{
name: "publish with no error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
Topic: "some-topic",
},
resultError: nil,
wantErr: false,
},
{
name: "publish with publish error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
Topic: "some-topic",
},
resultError: nil,
publishErr: fmt.Errorf("some error"),
wantErr: true,
},
}
for _, tt := range tests {
k := newkubeMQEvents(logger.NewLogger("kubemq-test"))
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
client := newKubemqEventsMock().
setResultError(tt.resultError).
setPublishError(tt.publishErr)
k.isInitialized = true
k.metadata = &kubemqMetadata{
internalHost: "",
internalPort: 0,
ClientID: "some-client-id",
AuthToken: "",
Group: "",
IsStore: false,
}
if tt.timeout > 0 {
k.waitForResultTimeout = tt.timeout - 1*time.Second
client.setPublishTimeout(tt.timeout)
}
k.client = client
_ = k.setPublishStream()
err := k.Publish(tt.req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_ = k.Features()
_ = k.Close()
}
}
func Test_kubeMQEvents_Subscribe(t *testing.T) {
tests := []struct {
name string
reqMsg *pubsub.NewMessage
subscribeError error
subscribeHandler pubsub.Handler
wantErr bool
}{
{
name: "subscribe with no error",
reqMsg: &pubsub.NewMessage{
Data: []byte("data"),
Topic: "some-topic",
},
subscribeHandler: func(ctx context.Context, msg *pubsub.NewMessage) error {
return nil
},
subscribeError: nil,
wantErr: false,
}, {
name: "subscribe with error",
reqMsg: &pubsub.NewMessage{
Data: []byte("data"),
Topic: "some-topic",
},
subscribeHandler: func(ctx context.Context, msg *pubsub.NewMessage) error {
return nil
},
subscribeError: fmt.Errorf("some error"),
wantErr: true,
},
}
for _, tt := range tests {
k := newkubeMQEvents(logger.NewLogger("kubemq-test"))
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
k.client = newKubemqEventsMock().
setSubscribeError(tt.subscribeError)
k.isInitialized = true
k.metadata = &kubemqMetadata{
internalHost: "",
internalPort: 0,
ClientID: "some-client-id",
AuthToken: "",
Group: "",
IsStore: false,
}
err := k.Subscribe(k.ctx, pubsub.SubscribeRequest{Topic: "some-topic"}, tt.subscribeHandler)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_ = k.Features()
_ = k.Close()
}
}