Fix linter issues, update test config, rebase upstream

Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
Bernd Verst 2022-11-02 15:15:45 -07:00
parent bfc482d3e4
commit 6df7f09927
12 changed files with 642 additions and 1026 deletions

123
go.mod
View File

@ -4,8 +4,8 @@ go 1.19
require (
cloud.google.com/go/datastore v1.8.0
cloud.google.com/go/pubsub v1.25.1
cloud.google.com/go/secretmanager v1.7.0
cloud.google.com/go/pubsub v1.26.0
cloud.google.com/go/secretmanager v1.8.0
cloud.google.com/go/storage v1.27.0
dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
@ -13,7 +13,7 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.10.1
@ -32,18 +32,18 @@ require (
github.com/alibaba/sentinel-golang v1.0.4
github.com/alibabacloud-go/darabonba-openapi v0.2.1
github.com/alibabacloud-go/oos-20190601 v1.0.4
github.com/alibabacloud-go/tea v1.1.19
github.com/alibabacloud-go/tea v1.1.20
github.com/alibabacloud-go/tea-utils v1.4.5
github.com/alicebob/miniredis/v2 v2.23.0
github.com/aliyun/aliyun-log-go-sdk v0.1.38
github.com/aliyun/aliyun-log-go-sdk v0.1.39
github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7
github.com/apache/dubbo-go-hessian2 v1.11.1
github.com/apache/dubbo-go-hessian2 v1.11.3
github.com/apache/pulsar-client-go v0.9.0
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
github.com/aws/aws-sdk-go v1.44.119
github.com/bradfitz/gomemcache v0.0.0-20220106215444-fb4bf637b56d
github.com/camunda/zeebe/clients/go/v8 v8.1.2
github.com/aws/aws-sdk-go v1.44.128
github.com/bradfitz/gomemcache v0.0.0-20221031212613-62deef7fc822
github.com/camunda/zeebe/clients/go/v8 v8.1.3
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cinience/go_rocketmq v0.0.2
github.com/coreos/go-oidc v2.2.1+incompatible
@ -51,10 +51,10 @@ require (
github.com/dancannon/gorethink v4.0.0+incompatible
github.com/dapr/kit v0.0.2
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dghubble/go-twitter v0.0.0-20220816163853-8a0df96f1e6d
github.com/dghubble/go-twitter v0.0.0-20221024160433-0cc1e72ed6d8
github.com/dghubble/oauth1 v0.7.1
github.com/didip/tollbooth v4.0.2+incompatible
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis/v8 v8.11.5
@ -71,19 +71,23 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.21.12+incompatible
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.5
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.6
github.com/influxdata/influxdb-client-go v1.4.0
github.com/jackc/pgx/v5 v5.0.3
github.com/jackc/pgx/v5 v5.0.4
github.com/json-iterator/go v1.1.12
github.com/kubemq-io/kubemq-go v1.7.6
github.com/labd/commercetools-go-sdk v1.1.0
github.com/kataras/go-errors v0.0.3 // indirect
github.com/kataras/go-serializer v0.0.4 // indirect
github.com/machinebox/graphql v0.2.2
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/matryer/is v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/microcosm-cc/bluemonday v1.0.21 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/mrz1836/postmark v1.2.11
github.com/nacos-group/nacos-sdk-go/v2 v2.1.0
github.com/nats-io/nats-server/v2 v2.9.3
github.com/nats-io/nats.go v1.18.0
github.com/mrz1836/postmark v1.3.0
github.com/nacos-group/nacos-sdk-go/v2 v2.1.2
github.com/nats-io/nats-server/v2 v2.9.4
github.com/nats-io/nats.go v1.19.0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/stan.go v0.10.3
github.com/open-policy-agent/opa v0.45.0
@ -91,42 +95,52 @@ require (
github.com/pashagolub/pgxmock/v2 v2.1.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/rabbitmq/amqp091-go v1.5.0
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/robfig/cron/v3 v3.0.1
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414
github.com/sendgrid/sendgrid-go v3.12.0+incompatible
github.com/sijms/go-ora/v2 v2.5.3
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.1
github.com/supplyon/gremcos v0.1.38
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.518
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/ssm v1.0.518
github.com/valyala/fasthttp v1.40.0
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.527
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/ssm v1.0.527
github.com/valyala/fasthttp v1.41.0
github.com/vmware/vmware-go-kcl v1.5.0
github.com/wapc/wapc-go v0.5.4
github.com/wapc/wapc-go v0.5.5
github.com/xdg-go/scram v1.1.1
go.mongodb.org/mongo-driver v1.10.3
go.temporal.io/api v1.12.0
go.temporal.io/sdk v1.17.0
go.uber.org/atomic v1.10.0
go.uber.org/ratelimit v0.2.0
golang.org/x/crypto v0.1.0
golang.org/x/net v0.1.0
golang.org/x/oauth2 v0.1.0
google.golang.org/api v0.100.0
google.golang.org/genproto v0.0.0-20221018160656-63c7b68cfc55
google.golang.org/api v0.101.0
google.golang.org/grpc v1.50.1
gopkg.in/couchbase/gocb.v1 v1.6.7
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/gorethink/gorethink.v4 v4.1.0 // indirect
gopkg.in/kataras/go-serializer.v0 v0.0.4 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.25.3
k8s.io/apiextensions-apiserver v0.25.3
k8s.io/apimachinery v0.25.3
k8s.io/client-go v0.25.3
k8s.io/utils v0.0.0-20221012122500-cfd413dd9e85
k8s.io/api v0.23.0
k8s.io/apiextensions-apiserver v0.23.0
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
)
require (
cloud.google.com/go v0.104.0 // indirect
github.com/kubemq-io/kubemq-go v1.7.6
github.com/labd/commercetools-go-sdk v1.1.0
github.com/rabbitmq/amqp091-go v1.5.0
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
)
require (
cloud.google.com/go v0.105.0 // indirect
cloud.google.com/go/compute v1.10.0 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
cloud.google.com/go/iam v0.6.0 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.1 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
@ -143,8 +157,6 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/RoaringBitmap/roaring v1.1.0 // indirect
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/Workiva/go-datastructures v1.0.53 // indirect
@ -188,8 +200,8 @@ require (
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/gavv/httpexpect v2.0.0+incompatible // indirect
github.com/go-kit/kit v0.10.0 // indirect
@ -197,9 +209,6 @@ require (
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
@ -210,7 +219,9 @@ require (
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/gogap/errors v0.0.0-20200228125012-531a6449b28c // indirect
github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
@ -223,7 +234,9 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
@ -232,7 +245,7 @@ require (
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/serf v0.9.6 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
@ -246,11 +259,8 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/copier v0.3.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/kataras/go-errors v0.0.3 // indirect
github.com/kataras/go-serializer v0.0.4 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/knadh/koanf v1.4.1 // indirect
github.com/kubemq-io/protobuf v1.3.1 // indirect
@ -260,14 +270,9 @@ require (
github.com/leodido/go-urn v1.2.1 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matryer/is v1.4.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/microcosm-cc/bluemonday v1.0.21 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
@ -278,24 +283,24 @@ require (
github.com/moul/http2curl v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/nats-streaming-server v0.25.2 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/rs/zerolog v1.25.0 // indirect
github.com/russross/blackfriday v1.6.0 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
@ -308,10 +313,10 @@ require (
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tchap/go-patricia/v2 v2.3.1 // indirect
github.com/tetratelabs/wazero v1.0.0-pre.2 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
github.com/tetratelabs/wazero v1.0.0-pre.3 // indirect
github.com/tidwall/gjson v1.13.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tjfoc/gmsm v1.3.2 // indirect
@ -333,13 +338,16 @@ require (
go.opencensus.io v0.23.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/couchbase/gocbcore.v7 v7.1.18 // indirect
@ -347,12 +355,9 @@ require (
gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4 // indirect
gopkg.in/couchbaselabs/jsonx.v1 v1.0.1 // indirect
gopkg.in/fatih/pool.v2 v2.0.0 // indirect
gopkg.in/gorethink/gorethink.v4 v4.1.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/kataras/go-serializer.v0 v0.0.4 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/square/go-jose.v2 v2.4.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect

1257
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -5,9 +5,10 @@ import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/google/uuid"
)
type kubeMQ struct {

View File

@ -5,9 +5,10 @@ import (
"sync"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/kubemq-io/kubemq-go"
)
type kubemqEventsClient interface {
@ -42,6 +43,7 @@ func newkubeMQEvents(logger logger.Logger) *kubeMQEvents {
isInitialized: false,
}
}
func (k *kubeMQEvents) init() error {
k.lock.RLock()
isInit := k.isInitialized
@ -77,11 +79,13 @@ func (k *kubeMQEvents) init() error {
k.isInitialized = true
return nil
}
func (k *kubeMQEvents) Init(meta *metadata) error {
k.metadata = meta
_ = k.init()
return nil
}
func (k *kubeMQEvents) setPublishStream() error {
var err error
k.publishFunc, err = k.client.Stream(k.ctx, func(err error) {
@ -92,6 +96,7 @@ func (k *kubeMQEvents) setPublishStream() error {
})
return err
}
func (k *kubeMQEvents) Publish(req *pubsub.PublishRequest) error {
if err := k.init(); err != nil {
return err
@ -111,6 +116,7 @@ func (k *kubeMQEvents) Publish(req *pubsub.PublishRequest) error {
}
return nil
}
func (k *kubeMQEvents) Features() []pubsub.Feature {
return nil
}

View File

@ -3,12 +3,14 @@ package kubemq
import (
"context"
"fmt"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/kubemq-io/kubemq-go"
"github.com/stretchr/testify/assert"
"testing"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
type kubemqEventsMock struct {
@ -32,6 +34,7 @@ func (k *kubemqEventsMock) publish(msg *kubemq.Event) error {
return nil
}
func (k *kubemqEventsMock) Stream(ctx context.Context, onError func(err error)) (func(msg *kubemq.Event) error, error) {
go func() {
for {
@ -41,33 +44,39 @@ func (k *kubemqEventsMock) Stream(ctx context.Context, onError func(err error))
case result := <-k.resultCh:
onError(result)
}
}
}()
return k.publish, nil
}
func (k *kubemqEventsMock) Subscribe(ctx context.Context, request *kubemq.EventsSubscription, onEvent func(msg *kubemq.Event, err error)) error {
return k.subscribeErr
}
func (k *kubemqEventsMock) Close() error {
return nil
}
func (k *kubemqEventsMock) setResultError(err error) *kubemqEventsMock {
k.resultError = err
return k
}
func (k *kubemqEventsMock) setSubscribeError(err error) *kubemqEventsMock {
k.subscribeErr = err
return k
}
func (k *kubemqEventsMock) setPublishTimeout(timeout time.Duration) *kubemqEventsMock {
k.publishTimeout = timeout
return k
}
func (k *kubemqEventsMock) setPublishError(err error) *kubemqEventsMock {
k.publishError = err
return k
}
func newKubemqEventsMock() *kubemqEventsMock {
return &kubemqEventsMock{
resultError: nil,
@ -137,6 +146,7 @@ func Test_kubeMQEvents_Publish(t *testing.T) {
_ = k.Close()
}
}
func Test_kubeMQEvents_Subscribe(t *testing.T) {
tests := []struct {
name string

View File

@ -6,9 +6,10 @@ import (
"sync"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/kubemq-io/kubemq-go"
)
// interface used to allow unit testing.
@ -44,6 +45,7 @@ func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore {
isInitialized: false,
}
}
func (k *kubeMQEventStore) init() error {
k.lock.RLock()
isInit := k.isInitialized
@ -79,11 +81,13 @@ func (k *kubeMQEventStore) init() error {
k.isInitialized = true
return nil
}
func (k *kubeMQEventStore) Init(meta *metadata) error {
k.metadata = meta
_ = k.init()
return nil
}
func (k *kubeMQEventStore) setPublishStream() error {
var err error
k.publishFunc, err = k.client.Stream(k.ctx, func(result *kubemq.EventStoreResult, err error) {
@ -97,6 +101,7 @@ func (k *kubeMQEventStore) setPublishStream() error {
}
return nil
}
func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
if err := k.init(); err != nil {
return err
@ -124,6 +129,7 @@ func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
}
return nil
}
func (k *kubeMQEventStore) Features() []pubsub.Feature {
return nil
}

View File

@ -3,12 +3,14 @@ package kubemq
import (
"context"
"fmt"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/kubemq-io/kubemq-go"
"github.com/stretchr/testify/assert"
"testing"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
type kubemqEventsStoreMock struct {
@ -32,6 +34,7 @@ func (k *kubemqEventsStoreMock) publish(msg *kubemq.EventStore) error {
return nil
}
func (k *kubemqEventsStoreMock) Stream(ctx context.Context, onResult func(result *kubemq.EventStoreResult, err error)) (func(msg *kubemq.EventStore) error, error) {
go func() {
for {
@ -45,33 +48,39 @@ func (k *kubemqEventsStoreMock) Stream(ctx context.Context, onResult func(result
Err: result,
}, nil)
}
}
}()
return k.publish, nil
}
func (k *kubemqEventsStoreMock) Subscribe(ctx context.Context, request *kubemq.EventsStoreSubscription, onEvent func(msg *kubemq.EventStoreReceive, err error)) error {
return k.subscribeErr
}
func (k *kubemqEventsStoreMock) Close() error {
return nil
}
func (k *kubemqEventsStoreMock) setResultError(err error) *kubemqEventsStoreMock {
k.resultError = err
return k
}
func (k *kubemqEventsStoreMock) setSubscribeError(err error) *kubemqEventsStoreMock {
k.subscribeErr = err
return k
}
func (k *kubemqEventsStoreMock) setPublishTimeout(timeout time.Duration) *kubemqEventsStoreMock {
k.publishTimeout = timeout
return k
}
func (k *kubemqEventsStoreMock) setPublishError(err error) *kubemqEventsStoreMock {
k.publishError = err
return k
}
func newKubemqEventsStoreMock() *kubemqEventsStoreMock {
return &kubemqEventsStoreMock{
resultError: nil,
@ -98,7 +107,8 @@ func Test_kubeMQEventsStore_Publish(t *testing.T) {
resultError: nil,
wantErr: false,
}, {
},
{
name: "publish with error",
req: &pubsub.PublishRequest{
Data: []byte("data"),
@ -159,6 +169,7 @@ func Test_kubeMQEventsStore_Publish(t *testing.T) {
_ = k.Close()
}
}
func Test_kubeMQkubeMQEventsStore_Subscribe(t *testing.T) {
tests := []struct {
name string

View File

@ -3,11 +3,13 @@ package kubemq
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
"testing"
)
func getMockEventsClient() *kubeMQEvents {
@ -23,6 +25,7 @@ func getMockEventsClient() *kubeMQEvents {
isInitialized: true,
}
}
func getMockEventsStoreClient() *kubeMQEventStore {
return &kubeMQEventStore{
client: newKubemqEventsStoreMock(),
@ -36,6 +39,7 @@ func getMockEventsStoreClient() *kubeMQEventStore {
isInitialized: true,
}
}
func Test_kubeMQ_Init(t *testing.T) {
tests := []struct {
name string
@ -46,7 +50,8 @@ func Test_kubeMQ_Init(t *testing.T) {
}{
{
name: "init events store client",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000",
"channel": "test",
@ -55,7 +60,8 @@ func Test_kubeMQ_Init(t *testing.T) {
"group": "group",
"store": "true",
"useMock": "true",
}},
},
},
},
eventsClient: nil,
eventStoreClient: getMockEventsStoreClient(),
@ -63,7 +69,8 @@ func Test_kubeMQ_Init(t *testing.T) {
},
{
name: "init events client",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000",
"channel": "test",
@ -72,7 +79,8 @@ func Test_kubeMQ_Init(t *testing.T) {
"group": "group",
"store": "false",
"useMock": "true",
}},
},
},
},
eventsClient: getMockEventsClient(),
eventStoreClient: nil,
@ -80,10 +88,12 @@ func Test_kubeMQ_Init(t *testing.T) {
},
{
name: "init error",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "badaddress",
}},
},
},
},
eventsClient: nil,
eventStoreClient: nil,

View File

@ -2,9 +2,10 @@ package kubemq
import (
"fmt"
"github.com/dapr/components-contrib/pubsub"
"strconv"
"strings"
"github.com/dapr/components-contrib/pubsub"
)
type metadata struct {

View File

@ -1,10 +1,12 @@
package kubemq
import (
"testing"
"github.com/stretchr/testify/assert"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/stretchr/testify/assert"
"testing"
)
func Test_createMetadata(t *testing.T) {
@ -16,7 +18,8 @@ func Test_createMetadata(t *testing.T) {
}{
{
name: "create valid metadata",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000",
"channel": "test",
@ -26,7 +29,8 @@ func Test_createMetadata(t *testing.T) {
"store": "true",
"useMock": "true",
"disableReDelivery": "true",
}},
},
},
},
want: &metadata{
host: "localhost",
@ -41,13 +45,15 @@ func Test_createMetadata(t *testing.T) {
},
{
name: "create valid metadata with empty group",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000",
"clientID": "clientID",
"authToken": "authToken",
"store": "false",
}},
},
},
},
want: &metadata{
host: "localhost",
@ -61,14 +67,16 @@ func Test_createMetadata(t *testing.T) {
},
{
name: "create valid metadata with empty authToken",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000",
"channel": "test",
"clientID": "clientID",
"group": "group",
"store": "true",
}},
},
},
},
want: &metadata{
host: "localhost",
@ -82,56 +90,66 @@ func Test_createMetadata(t *testing.T) {
},
{
name: "create invalid metadata with bad host",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": ":50000",
"clientID": "clientID",
}},
},
},
},
want: nil,
wantErr: true,
},
{
name: "create invalid metadata with bad port",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:badport",
"clientID": "clientID",
}},
},
},
},
want: nil,
wantErr: true,
},
{
name: "create invalid metadata with empty address",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "",
"clientID": "clientID",
}},
},
},
},
want: nil,
wantErr: true,
},
{
name: "create invalid metadata with bad address format",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000:badport",
"clientID": "clientID",
}},
},
},
},
want: nil,
wantErr: true,
},
{
name: "create invalid metadata with bad store info",
meta: pubsub.Metadata{Base: mdata.Base{
meta: pubsub.Metadata{
Base: mdata.Base{
Properties: map[string]string{
"address": "localhost:50000",
"clientID": "clientID",
"store": "bad",
}},
},
},
},
want: nil,
wantErr: true,
@ -143,7 +161,6 @@ func Test_createMetadata(t *testing.T) {
got, err := createMetadata(tt.meta)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, got)

View File

@ -811,7 +811,7 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=

View File

@ -1,4 +1,6 @@
# Supported operation: publish, subscribe
# Supported operation: publish, subscribe, multiplehandlers, bulkpublish, bulksubscribe
# bulkpublish should only be run for components that implement pubsub.BulkPublisher interface
# bulksubscribe should only be run for components that implement pubsub.BulkSubscriber interface
# Config map:
## pubsubName : name of the pubsub
## testTopicName: name of the test topic to use
@ -10,7 +12,7 @@
componentType: pubsub
components:
- component: azure.eventhubs
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers", "bulkpublish"]
config:
pubsubName: azure-eventhubs
testTopicName: eventhubs-pubsub-topic
@ -40,13 +42,13 @@ components:
testMultiTopic2Name: dapr-conf-queue-multi2
checkInOrderProcessing: false
- component: redis
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
config:
checkInOrderProcessing: false
- component: natsstreaming
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: jetstream
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: kafka
allOperations: true
- component: kafka
@ -56,27 +58,27 @@ components:
profile: confluent
allOperations: true
- component: pulsar
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: mqtt
profile: mosquitto
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: mqtt
profile: emqx
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: mqtt
profile: vernemq
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: hazelcast
allOperations: true
- component: kubemq
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: rabbitmq
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
config:
checkInOrderProcessing: false
- component: in-memory
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
- component: aws.snssqs
allOperations: true
operations: ["publish", "subscribe", "multiplehandlers"]
config:
checkInOrderProcessing: false
- component: kubemq
allOperations: true