MQTT Pubsub Certification Testing + AutAckOff Fix for MQTT (#1420)
* MQTT Certification Test Signed-off-by: shivam <shivamkm07@gmail.com> * Using paho.mqtt.golang fork with AutoAck fix Signed-off-by: shivam <shivamkm07@gmail.com> * Adding MQTT component in certification.yml Signed-off-by: shivam <shivamkm07@gmail.com> Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
parent
a5f1d864fd
commit
4631d6e41a
|
@ -54,6 +54,7 @@ jobs:
|
|||
PR_COMPONENTS=$(yq -I0 --tojson eval - << EOF
|
||||
- pubsub.kafka
|
||||
- pubsub.rabbitmq
|
||||
- pubsub.mqtt
|
||||
EOF
|
||||
)
|
||||
echo "::set-output name=pr-components::$PR_COMPONENTS"
|
||||
|
|
2
go.mod
2
go.mod
|
@ -305,3 +305,5 @@ require (
|
|||
require github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b // indirect
|
||||
|
||||
replace k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36
|
||||
|
||||
replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8
|
||||
|
|
4
go.sum
4
go.sum
|
@ -353,8 +353,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
|
|||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
|
||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
|
||||
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
|
@ -1088,6 +1086,8 @@ github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
|
|||
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
|
||||
github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o9VYE=
|
||||
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
|
||||
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s=
|
||||
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
|
||||
|
|
|
@ -216,6 +216,7 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl
|
|||
token := m.consumer.SubscribeMultiple(
|
||||
m.topics,
|
||||
func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
mqttMsg.AutoAckOff()
|
||||
msg := pubsub.NewMessage{
|
||||
Topic: mqttMsg.Topic(),
|
||||
Data: mqttMsg.Payload(),
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
# MQTT certifcation testing
|
||||
|
||||
This project aims to test the MQTT Pub/Sub component under various conditions.
|
||||
|
||||
## Test plan
|
||||
|
||||
### Basic Test
|
||||
|
||||
* Bring up a MQTT cluster
|
||||
* Start 1 sidecar/application(App1)
|
||||
* Publishes 1000+ unique messages
|
||||
* App: Simulate periodic errors
|
||||
* Component: Retries on error
|
||||
* App: Observes successful messages
|
||||
* Test: Confirms that all expected messages were received
|
||||
|
||||
### Multiple Publishers-Subscribers
|
||||
|
||||
* Start second sidecar/application(App2)
|
||||
* Each of the publishers publish a fixed number of messages to the topic
|
||||
* Test: Confirms that both applications receive all published messages
|
||||
|
||||
### Infra Test
|
||||
|
||||
* Start a constant flow of publishing and subscribing(App1)
|
||||
* Test: Keeps count of total sent/received
|
||||
* Start another sidecar/application with persistent session(App2)
|
||||
* Test: Publishes messages in background
|
||||
* Each of the applications should receive messages
|
||||
* Stop consumer connected with persistent session(App2)
|
||||
* Test: Publishes messages in background
|
||||
* Only App1 should receive messages
|
||||
* Stop publisher as well so that none of the components are active
|
||||
* No messages are published and received
|
||||
* Restart second consumer with persistent session
|
||||
* App2 receives all lost messages
|
||||
* Restart publisher so that both components are active
|
||||
* Test: Confirms that both applications received all published messages and no messages were lost
|
||||
|
||||
### Network Test
|
||||
* Simulate network interruption
|
||||
* Test: Begins trying to reconnect & publish
|
||||
* Component: Begins trying to reconnect & re-subscribe
|
|
@ -0,0 +1,20 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: messagebus
|
||||
spec:
|
||||
type: pubsub.mqtt
|
||||
version: v1
|
||||
metadata:
|
||||
- name: url
|
||||
value: "tcp://localhost:1884"
|
||||
- name: consumerID
|
||||
value: "testConsumer1"
|
||||
- name: retain
|
||||
value: true
|
||||
- name: qos
|
||||
value: 2
|
||||
- name: cleanSession
|
||||
value: false
|
||||
- name: backOffMaxRetries
|
||||
value: 5
|
|
@ -0,0 +1,20 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: messagebus
|
||||
spec:
|
||||
type: pubsub.mqtt
|
||||
version: v1
|
||||
metadata:
|
||||
- name: url
|
||||
value: "tcp://localhost:1884"
|
||||
- name: consumerID
|
||||
value: "testConsumer2"
|
||||
- name: retain
|
||||
value: false
|
||||
- name: qos
|
||||
value: 2
|
||||
- name: cleanSession
|
||||
value: false
|
||||
- name: backOffMaxRetries
|
||||
value: 5
|
|
@ -0,0 +1,8 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Configuration
|
||||
metadata:
|
||||
name: pubsubroutingconfig
|
||||
spec:
|
||||
features:
|
||||
- name: PubSub.Routing
|
||||
enabled: true
|
|
@ -0,0 +1,8 @@
|
|||
version: '2'
|
||||
services:
|
||||
emqx:
|
||||
image: emqx/emqx:4.2.7
|
||||
hostname: emqx
|
||||
container_name: emqx
|
||||
ports:
|
||||
- "1884:1883"
|
|
@ -0,0 +1,118 @@
|
|||
module github.com/dapr/components-contrib/tests/certification/pubsub/mqtt
|
||||
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible
|
||||
github.com/dapr/components-contrib v1.5.0-rc.1.0.20220105071850-a013b58d6cee
|
||||
github.com/dapr/components-contrib/tests/certification v1.4.0-rc2
|
||||
github.com/dapr/dapr v1.5.2-0.20220106203753-0e6bcbabc8ba
|
||||
github.com/dapr/go-sdk v1.3.0
|
||||
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.uber.org/multierr v1.7.0
|
||||
)
|
||||
|
||||
require (
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
|
||||
contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect
|
||||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
|
||||
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/fasthttp/router v1.3.8 // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/go-kit/log v0.1.0 // indirect
|
||||
github.com/go-logfmt/logfmt v0.5.1 // indirect
|
||||
github.com/go-logr/logr v0.3.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/cel-go v0.9.0 // indirect
|
||||
github.com/google/go-cmp v0.5.6 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/googleapis/gnostic v0.5.1 // indirect
|
||||
github.com/gorilla/mux v1.8.0 // indirect
|
||||
github.com/gorilla/websocket v1.4.2 // indirect
|
||||
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
|
||||
github.com/hashicorp/consul/api v1.3.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.0.0 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/hashicorp/serf v0.8.2 // indirect
|
||||
github.com/imdario/mergo v0.3.10 // indirect
|
||||
github.com/json-iterator/go v1.1.11 // indirect
|
||||
github.com/klauspost/compress v1.13.4 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
|
||||
github.com/miekg/dns v1.1.35 // indirect
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.4.1 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/openzipkin/zipkin-go v0.2.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v1.11.0 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.32.1 // indirect
|
||||
github.com/prometheus/procfs v0.7.3 // indirect
|
||||
github.com/prometheus/statsd_exporter v0.22.3 // indirect
|
||||
github.com/savsgio/gotils v0.0.0-20210217112953-d4a072536008 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stoewer/go-strcase v1.2.0 // indirect
|
||||
github.com/tylertreat/comcast v1.0.1 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.31.1-0.20211216042702-258a4c17b4f4 // indirect
|
||||
go.opencensus.io v0.23.0 // indirect
|
||||
go.opentelemetry.io/otel v0.19.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||
golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
|
||||
google.golang.org/appengine v1.6.6 // indirect
|
||||
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect
|
||||
google.golang.org/grpc v1.40.0 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
k8s.io/api v0.20.0 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.20.0 // indirect
|
||||
k8s.io/apimachinery v0.20.0 // indirect
|
||||
k8s.io/client-go v0.20.0 // indirect
|
||||
k8s.io/klog/v2 v2.4.0 // indirect
|
||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
|
||||
sigs.k8s.io/controller-runtime v0.7.0 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
|
||||
sigs.k8s.io/yaml v1.2.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/dapr/components-contrib/tests/certification => ../../
|
||||
|
||||
replace github.com/dapr/components-contrib => ../../../../
|
||||
|
||||
replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8
|
||||
|
||||
// Uncomment for local development for testing with changes
|
||||
// in the Dapr runtime. Don't commit with this uncommented!
|
||||
//
|
||||
// replace github.com/dapr/dapr => ../../../../../dapr
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,339 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mqtt_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
// "github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/multierr"
|
||||
|
||||
// Pub/Sub.
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
pubsub_mqtt "github.com/dapr/components-contrib/pubsub/mqtt"
|
||||
pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub"
|
||||
|
||||
// Dapr runtime and Go-SDK
|
||||
"github.com/dapr/dapr/pkg/runtime"
|
||||
"github.com/dapr/go-sdk/service/common"
|
||||
"github.com/dapr/kit/logger"
|
||||
|
||||
kit_retry "github.com/dapr/kit/retry"
|
||||
|
||||
// Certification testing runnables
|
||||
"github.com/dapr/components-contrib/tests/certification/embedded"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/app"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/network"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/retry"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
|
||||
)
|
||||
|
||||
const (
|
||||
sidecarName1 = "dapr-1"
|
||||
sidecarName2 = "dapr-2"
|
||||
appID1 = "app-1"
|
||||
appID2 = "app-2"
|
||||
clusterName = "mqttcertification"
|
||||
dockerComposeYAML = "docker-compose.yml"
|
||||
numMessages = 1000
|
||||
appPort = 8000
|
||||
portOffset = 2
|
||||
messageKey = "partitionKey"
|
||||
mqttURL = "tcp://localhost:1884"
|
||||
|
||||
pubsubName = "messagebus"
|
||||
topicName = "neworder"
|
||||
)
|
||||
|
||||
var brokers = []string{"localhost:1884"}
|
||||
|
||||
func mqttReady(url string) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
const defaultWait = 3 * time.Second
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.SetClientID("test")
|
||||
opts.AddBroker(url)
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
for !token.WaitTimeout(defaultWait) {
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
client.Disconnect(0)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTT(t *testing.T) {
|
||||
log := logger.NewLogger("dapr.components")
|
||||
component := pubsub_loader.New("mqtt", func() pubsub.PubSub {
|
||||
return pubsub_mqtt.NewMQTTPubSub(log)
|
||||
})
|
||||
|
||||
//In-order processing not guaranteed
|
||||
consumerGroup1 := watcher.NewUnordered()
|
||||
consumerGroup2 := watcher.NewUnordered()
|
||||
|
||||
// Application logic that tracks messages from a topic.
|
||||
application := func(messages *watcher.Watcher, appID string) app.SetupFn {
|
||||
return func(ctx flow.Context, s common.Service) error {
|
||||
// Simulate periodic errors.
|
||||
sim := simulate.PeriodicError(ctx, 100)
|
||||
// Setup the /orders event handler.
|
||||
return multierr.Combine(
|
||||
s.AddTopicEventHandler(&common.Subscription{
|
||||
PubsubName: pubsubName,
|
||||
Topic: topicName,
|
||||
Route: "/orders",
|
||||
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||
if err := sim(); err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Track/Observe the data of the event.
|
||||
messages.Observe(e.Data)
|
||||
ctx.Logf("%s Event - pubsub: %s, topic: %s, id: %s, data: %s", appID,
|
||||
e.PubsubName, e.Topic, e.ID, e.Data)
|
||||
return false, nil
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Test logic that sends messages to a topic and
|
||||
// verifies the application has received them.
|
||||
test := func(messages ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
|
||||
client := sidecar.GetClient(ctx, sidecarName1)
|
||||
|
||||
// Declare what is expected BEFORE performing any steps
|
||||
// that will satisfy the test.
|
||||
msgs := make([]string, numMessages)
|
||||
for i := range msgs {
|
||||
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
|
||||
}
|
||||
for _, m := range messages {
|
||||
m.ExpectStrings(msgs...)
|
||||
}
|
||||
|
||||
// Send events that the application above will observe.
|
||||
ctx.Log("Sending messages!")
|
||||
for _, msg := range msgs {
|
||||
ctx.Logf("Sending: %q", msg)
|
||||
err := client.PublishEvent(
|
||||
ctx, pubsubName, topicName, msg)
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
|
||||
// Do the messages we observed match what we expect?
|
||||
for _, m := range messages {
|
||||
m.Assert(ctx, time.Minute)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
multiple_test := func(messages ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
publish_msgs := func(sidecarName string) {
|
||||
defer wg.Done()
|
||||
client := sidecar.GetClient(ctx, sidecarName)
|
||||
msgs := make([]string, numMessages/2)
|
||||
for i := range msgs {
|
||||
msgs[i] = fmt.Sprintf("%s: Hello, Messages %03d", sidecarName, i)
|
||||
}
|
||||
for _, m := range messages {
|
||||
m.ExpectStrings(msgs...)
|
||||
}
|
||||
ctx.Log("Sending messages!")
|
||||
for _, msg := range msgs {
|
||||
ctx.Logf("Sending: %q", msg)
|
||||
err := client.PublishEvent(
|
||||
ctx, pubsubName, topicName, msg)
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
}
|
||||
}
|
||||
go publish_msgs(sidecarName1)
|
||||
go publish_msgs(sidecarName2)
|
||||
|
||||
wg.Wait()
|
||||
// Do the messages we observed match what we expect?
|
||||
for _, m := range messages {
|
||||
m.Assert(ctx, time.Minute)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// sendMessagesInBackground and assertMessages are
|
||||
// Runnables for testing publishing and consuming
|
||||
// messages reliably when infrastructure and network
|
||||
// interruptions occur.
|
||||
var task flow.AsyncTask
|
||||
sendMessagesInBackground := func(messages ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
client := sidecar.GetClient(ctx, sidecarName1)
|
||||
for _, m := range messages {
|
||||
m.Reset()
|
||||
}
|
||||
|
||||
t := time.NewTicker(200 * time.Millisecond)
|
||||
defer t.Stop()
|
||||
|
||||
counter := 1
|
||||
for {
|
||||
select {
|
||||
case <-task.Done():
|
||||
return nil
|
||||
case <-t.C:
|
||||
msg := fmt.Sprintf("Background message - %03d", counter)
|
||||
for _, m := range messages {
|
||||
m.Prepare(msg) // Track for observation
|
||||
}
|
||||
|
||||
// Publish with retries.
|
||||
bo := backoff.WithContext(backoff.NewConstantBackOff(time.Second), task)
|
||||
if err := kit_retry.NotifyRecover(func() error {
|
||||
return client.PublishEvent(
|
||||
// Using ctx instead of task here is deliberate.
|
||||
// We don't want cancelation to prevent adding
|
||||
// the message, only to interrupt between tries.
|
||||
ctx, pubsubName, topicName, msg)
|
||||
}, bo, func(err error, t time.Duration) {
|
||||
ctx.Logf("Error publishing message, retrying in %s", t)
|
||||
}, func() {}); err == nil {
|
||||
for _, m := range messages {
|
||||
m.Add(msg) // Success
|
||||
}
|
||||
counter++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertMessages := func(messages ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
// Signal sendMessagesInBackground to stop and wait for it to complete.
|
||||
task.CancelAndWait()
|
||||
for _, m := range messages {
|
||||
m.Assert(ctx, 1*time.Minute)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
flow.New(t, "mqtt certification").
|
||||
// Run MQTT using Docker Compose.
|
||||
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
|
||||
Step("wait for broker sockets",
|
||||
network.WaitForAddresses(5*time.Minute, brokers...)).
|
||||
Step("wait for MQTT readiness",
|
||||
retry.Do(time.Second, 30, mqttReady(mqttURL))).
|
||||
//
|
||||
// Run the application logic above(App1)
|
||||
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
||||
application(consumerGroup1, appID1))).
|
||||
// Run the Dapr sidecar with the MQTTPubSub component.
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/consumer1"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
|
||||
runtime.WithPubSubs(component))).
|
||||
//
|
||||
// Send messages and test
|
||||
Step("send and wait", test(consumerGroup1)).
|
||||
Step("reset", flow.Reset(consumerGroup1)).
|
||||
//
|
||||
//Run Second application App2
|
||||
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
||||
application(consumerGroup2, appID2))).
|
||||
// Run the Dapr sidecar with the MQTTPubSub component.
|
||||
Step(sidecar.Run(sidecarName2,
|
||||
embedded.WithComponentsPath("./components/consumer2"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
|
||||
runtime.WithPubSubs(component))).
|
||||
//
|
||||
// Send messages and test
|
||||
Step("multiple send and wait", multiple_test(consumerGroup1, consumerGroup2)).
|
||||
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
|
||||
//
|
||||
// Infra test
|
||||
StepAsync("steady flow of messages to publish", &task,
|
||||
sendMessagesInBackground(consumerGroup1, consumerGroup2)).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
Step("stop sidecar 2", sidecar.Stop(sidecarName2)).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
Step("stop sidecar 1", sidecar.Stop(sidecarName1)).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
Step(sidecar.Run(sidecarName2,
|
||||
embedded.WithComponentsPath("./components/consumer2"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
|
||||
runtime.WithPubSubs(component))).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/consumer1"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
|
||||
runtime.WithPubSubs(component))).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
Step("assert messages", assertMessages(consumerGroup1, consumerGroup2)).
|
||||
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
|
||||
//
|
||||
// Simulate a network interruption.
|
||||
// This tests the components ability to handle reconnections
|
||||
// when Dapr is disconnected abnormally.
|
||||
StepAsync("steady flow of messages to publish", &task,
|
||||
sendMessagesInBackground(consumerGroup1, consumerGroup2)).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
//
|
||||
// Errors will occurring here.
|
||||
Step("interrupt network",
|
||||
network.InterruptNetwork(5*time.Second, nil, nil, "18084")).
|
||||
//
|
||||
// Component should recover at this point.
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
Step("assert messages", assertMessages(consumerGroup1, consumerGroup2)).
|
||||
Run()
|
||||
}
|
Loading…
Reference in New Issue