/* Copyright 2021 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package mqtt_test import ( "context" "crypto/rand" "encoding/hex" "fmt" "io" "strconv" "strings" "sync" "sync/atomic" "testing" "time" "github.com/cenkalti/backoff/v4" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/stretchr/testify/require" "go.uber.org/multierr" // Pub/Sub. pubsub_mqtt "github.com/dapr/components-contrib/pubsub/mqtt3" pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub" "github.com/dapr/dapr/pkg/config/protocol" // Dapr runtime and Go-SDK "github.com/dapr/dapr/pkg/runtime" "github.com/dapr/go-sdk/service/common" "github.com/dapr/kit/logger" // Certification testing runnables "github.com/dapr/components-contrib/tests/certification/embedded" "github.com/dapr/components-contrib/tests/certification/flow" "github.com/dapr/components-contrib/tests/certification/flow/app" "github.com/dapr/components-contrib/tests/certification/flow/dockercompose" "github.com/dapr/components-contrib/tests/certification/flow/network" "github.com/dapr/components-contrib/tests/certification/flow/retry" "github.com/dapr/components-contrib/tests/certification/flow/sidecar" "github.com/dapr/components-contrib/tests/certification/flow/simulate" "github.com/dapr/components-contrib/tests/certification/flow/watcher" ) const ( sidecarName1 = "dapr-1" sidecarName2 = "dapr-2" sidecarName3 = "dapr-3" appID1 = "app-1" appID2 = "app-2" appID3 = "app-3" clusterName = "mqttcertification" dockerComposeYAML = "docker-compose.yml" numMessages = 1000 appPort = 8000 portOffset = 2 messageKey = "partitionKey" mqttURL = "tcp://localhost:1884" pubsubName = "messagebus" topicName = "neworder" wildcardTopicSubscribe = "orders/#" wildcardTopicPublish = "orders/%s" sharedTopicSubscribe = "$share/mygroup/mytopic/+/hello" sharedTopicPublish = "mytopic/%s/hello" ) var brokers = []string{"localhost:1884"} func mqttReady(url string) flow.Runnable { return func(ctx flow.Context) error { const defaultWait = 3 * time.Second opts := mqtt.NewClientOptions() opts.SetClientID("test") opts.AddBroker(url) client := mqtt.NewClient(opts) token := client.Connect() for !token.WaitTimeout(defaultWait) { } if err := token.Error(); err != nil { return err } client.Disconnect(0) return nil } } func TestMQTT(t *testing.T) { logger.ApplyOptionsToLoggers(&logger.Options{ OutputLevel: "debug", }) // In-order processing not guaranteed consumerGroup1 := watcher.NewUnordered() consumerGroup2 := watcher.NewUnordered() consumerGroupMultiWildcard := watcher.NewUnordered() consumerGroupMultiShared := watcher.NewUnordered() // Application logic that tracks messages from a topic. application := func(messages *watcher.Watcher, appID string, topicName string) app.SetupFn { return func(ctx flow.Context, s common.Service) error { // Simulate periodic errors. sim := simulate.PeriodicError(ctx, 100) // Setup the /orders event handler. return multierr.Combine( s.AddTopicEventHandler(&common.Subscription{ PubsubName: pubsubName, Topic: topicName, Route: "/orders", }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { if err := sim(); err != nil { return true, err } // Track/Observe the data of the event. messages.Observe(e.Data) ctx.Logf("%s Event - pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data) return false, nil }), ) } } // Application logic that subscribes to multiple topics applicationMultiTopic := func(appID string, subs ...topicSubscription) app.SetupFn { return func(ctx flow.Context, s common.Service) (err error) { handlerGen := func(name string, messages *watcher.Watcher) func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { return func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { messages.Observe(e.Data) ctx.Logf("%s/%s Event - pubsub: %s, topic: %s, id: %s, data: %s", appID, name, e.PubsubName, e.Topic, e.ID, e.Data) return false, nil } } for _, sub := range subs { err = s.AddTopicEventHandler( &common.Subscription{ PubsubName: pubsubName, Topic: sub.name, Route: sub.route, }, handlerGen(sub.name, sub.messages), ) if err != nil { return err } } return nil } } // Test logic that sends messages to a topic and // verifies the application has received them. test := func(topicName string, messages ...*watcher.Watcher) flow.Runnable { return func(ctx flow.Context) error { client := sidecar.GetClient(ctx, sidecarName1) // Declare what is expected BEFORE performing any steps // that will satisfy the test. msgs := make([]string, numMessages) for i := range msgs { msgs[i] = fmt.Sprintf("Hello, Messages %s#%03d", topicName, i) } for _, m := range messages { m.ExpectStrings(msgs...) } // Send events that the application above will observe. ctx.Log("Sending messages!") for _, msg := range msgs { // If topicName has a %s, this will add some randomness (if not, it won't be changed) tn := topicName if strings.Contains(tn, "%s") { tn = fmt.Sprintf(tn, randomStr()) } ctx.Logf("Sending '%q' to topic '%s'", msg, tn) err := client.PublishEvent(ctx, pubsubName, tn, msg) require.NoError(ctx, err, "error publishing message") } // Do the messages we observed match what we expect? for _, m := range messages { m.Assert(ctx, time.Minute) } return nil } } multipleTest := func(messages ...*watcher.Watcher) flow.Runnable { return func(ctx flow.Context) error { var wg sync.WaitGroup wg.Add(2) publishMsgs := func(sidecarName string) { defer wg.Done() client := sidecar.GetClient(ctx, sidecarName) msgs := make([]string, numMessages/2) for i := range msgs { msgs[i] = fmt.Sprintf("%s: Hello, Messages %03d", sidecarName, i) } for _, m := range messages { m.ExpectStrings(msgs...) } ctx.Log("Sending messages!") for _, msg := range msgs { ctx.Logf("Sending: %q", msg) err := client.PublishEvent(ctx, pubsubName, topicName, msg) require.NoError(ctx, err, "error publishing message") } } go publishMsgs(sidecarName1) go publishMsgs(sidecarName2) wg.Wait() // Do the messages we observed match what we expect? for _, m := range messages { m.Assert(ctx, time.Minute) } return nil } } // sendMessagesInBackground and assertMessages are // Runnables for testing publishing and consuming // messages reliably when infrastructure and network // interruptions occur. var task flow.AsyncTask counter := &atomic.Int64{} counter.Store(1) sendMessagesInBackground := func(messages ...*watcher.Watcher) flow.Runnable { return func(ctx flow.Context) error { client := sidecar.GetClient(ctx, sidecarName1) for _, m := range messages { m.Reset() } t := time.NewTicker(200 * time.Millisecond) defer t.Stop() for { select { case <-task.Done(): return nil case <-t.C: msg := fmt.Sprintf("Background message - %03d", counter.Load()) for _, m := range messages { m.Prepare(msg) // Track for observation } // Publish with retries. err := backoff.RetryNotify( func() error { ctx.Logf("Sending '%q' to topic '%s'", msg, topicName) // Using ctx instead of task here is deliberate. // We don't want cancelation to prevent adding // the message, only to interrupt between tries. return client.PublishEvent(ctx, pubsubName, topicName, msg) }, backoff.WithContext(backoff.NewConstantBackOff(time.Second), task), func(err error, t time.Duration) { ctx.Logf("Error publishing message '%s', retrying in %s", msg, t) }, ) if err == nil { for _, m := range messages { m.Add(msg) // Success } counter.Add(1) } else { for _, m := range messages { m.Remove(msg) // Remove from Tracking } } } } } } assertMessages := func(messages ...*watcher.Watcher) flow.Runnable { return func(ctx flow.Context) error { // Signal sendMessagesInBackground to stop and wait for it to complete. task.CancelAndWait() for _, m := range messages { m.Assert(ctx, 1*time.Minute) } return nil } } flow.New(t, "mqtt certification"). // Run MQTT using Docker Compose. Step(dockercompose.Run(clusterName, dockerComposeYAML)). Step("wait for broker sockets", network.WaitForAddresses(5*time.Minute, brokers...)). Step("wait for MQTT readiness", retry.Do(time.Second, 30, mqttReady(mqttURL))). // // Run the application logic above(App1) Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), application(consumerGroup1, appID1, topicName))). // Run the Dapr sidecar with the MQTTPubSub component. Step(sidecar.Run(sidecarName1, append(componentRuntimeOptions(), embedded.WithComponentsPath("./components/consumer1"), embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), embedded.WithGracefulShutdownDuration(0), )..., )). // // Send messages and test Step("send and wait 1", test(topicName, consumerGroup1)). Step("reset 1", flow.Reset(consumerGroup1)). // //Run Second application App2 Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset), application(consumerGroup2, appID2, topicName))). // Run the Dapr sidecar with the MQTTPubSub component. Step(sidecar.Run(sidecarName2, append(componentRuntimeOptions(), embedded.WithComponentsPath("./components/consumer2"), embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)), embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)), embedded.WithGracefulShutdownDuration(0), )..., )). // // Send messages and test Step("multiple send and wait", multipleTest(consumerGroup1, consumerGroup2)). Step("reset 2", flow.Reset(consumerGroup1, consumerGroup2)). // // Test multiple topics and wildcards Step( app.Run( appID3, fmt.Sprintf(":%d", appPort+(portOffset*3)), applicationMultiTopic( appID3, topicSubscription{messages: consumerGroupMultiWildcard, name: wildcardTopicSubscribe, route: "/wildcard"}, topicSubscription{messages: consumerGroupMultiShared, name: sharedTopicSubscribe, route: "/shared"}, ), ), ). Step(sidecar.Run(sidecarName3, append(componentRuntimeOptions(), embedded.WithComponentsPath("./components/consumer3"), embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+(portOffset*3))), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+(portOffset*3))), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+(portOffset*3))), embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+(portOffset*3))), embedded.WithGracefulShutdownDuration(0), )..., )). Step("send and wait wildcard", test(wildcardTopicPublish, consumerGroupMultiWildcard)). Step("send and wait shared", test(sharedTopicPublish, consumerGroupMultiShared)). // // Infra test StepAsync("steady flow of messages to publish", &task, sendMessagesInBackground(consumerGroup1, consumerGroup2)). Step("wait before stopping sidecar 2", flow.Sleep(5*time.Second)). Step("stop sidecar 2", sidecar.Stop(sidecarName2)). Step("wait before stopping sidecar 1", flow.Sleep(5*time.Second)). Step("stop sidecar 1", sidecar.Stop(sidecarName1)). Step("wait 1", flow.Sleep(5*time.Second)). Step(sidecar.Run(sidecarName2, append(componentRuntimeOptions(), embedded.WithComponentsPath("./components/consumer2"), embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)), embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)), embedded.WithGracefulShutdownDuration(0), )..., )). Step("wait 2", flow.Sleep(5*time.Second)). Step(sidecar.Run(sidecarName1, append(componentRuntimeOptions(), embedded.WithComponentsPath("./components/consumer1"), embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), embedded.WithGracefulShutdownDuration(0), )..., )). Step("wait 3", flow.Sleep(5*time.Second)). Step("assert messages 1", assertMessages(consumerGroup1, consumerGroup2)). Step("reset 3", flow.Reset(consumerGroup1, consumerGroup2)). // // Simulate a network interruption. // This tests the components ability to handle reconnections // when Dapr is disconnected abnormally. StepAsync("steady flow of messages to publish", &task, sendMessagesInBackground(consumerGroup1, consumerGroup2)). Step("wait 4", flow.Sleep(5*time.Second)). // // Errors will occurring here. Step("interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "18084")). // // Component should recover at this point. Step("wait 5", flow.Sleep(5*time.Second)). Step("assert messages 2", assertMessages(consumerGroup1, consumerGroup2)). Run() } type topicSubscription struct { messages *watcher.Watcher name string route string } func componentRuntimeOptions() []embedded.Option { log := logger.NewLogger("dapr.components") pubsubRegistry := pubsub_loader.NewRegistry() pubsubRegistry.Logger = log pubsubRegistry.RegisterComponent(pubsub_mqtt.NewMQTTPubSub, "mqtt3") return []embedded.Option{ embedded.WithPubSubs(pubsubRegistry), } } func randomStr() string { buf := make([]byte, 4) _, _ = io.ReadFull(rand.Reader, buf) return hex.EncodeToString(buf) }