From 5e4bd89afd7de99d0890078ebab6b25525d0f492 Mon Sep 17 00:00:00 2001 From: lotuc Date: Thu, 12 May 2022 03:11:51 +0800 Subject: [PATCH] Add topic metadata for mqtt input binding and support user defined topic for mqtt output binding (#1674) * feat(bindings/mqtt): add data incoming topic to metadata Signed-off-by: lotuc * feat(bindings/mqtt): support user defined topic on create action Signed-off-by: lotuc * chore(bindings/mqtt): add integration test and topic response check test Signed-off-by: lotuc * fix(bindings/mqtt): ignore misspell linting error for word mosquitto Signed-off-by: lotuc Co-authored-by: Yaron Schneider --- bindings/mqtt/mqtt.go | 14 ++- bindings/mqtt/mqtt_integration_test.go | 114 +++++++++++++++++++++++++ bindings/mqtt/mqtt_test.go | 72 ++++++++++++++++ 3 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 bindings/mqtt/mqtt_integration_test.go diff --git a/bindings/mqtt/mqtt.go b/bindings/mqtt/mqtt.go index 3355b45a4..84ec94e99 100644 --- a/bindings/mqtt/mqtt.go +++ b/bindings/mqtt/mqtt.go @@ -204,8 +204,13 @@ func (m *MQTT) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindin bo = backoff.WithContext(bo, ctx) return nil, retry.NotifyRecover(func() error { - m.logger.Debugf("mqtt publishing topic %s with data: %v", m.metadata.topic, req.Data) - token := m.producer.Publish(m.metadata.topic, m.metadata.qos, m.metadata.retain, req.Data) + topic, ok := req.Metadata[mqttTopic] + if !ok || topic == "" { + // If user does not specify a topic, publish via the component's default topic. + topic = m.metadata.topic + } + m.logger.Debugf("mqtt publishing topic %s with data: %v", topic, req.Data) + token := m.producer.Publish(topic, m.metadata.qos, m.metadata.retain, req.Data) if !token.WaitTimeout(defaultWait) || token.Error() != nil { return fmt.Errorf("mqtt error from publish: %v", token.Error()) } @@ -218,7 +223,10 @@ func (m *MQTT) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindin } func (m *MQTT) handleMessage(handler func(context.Context, *bindings.ReadResponse) ([]byte, error), mqttMsg mqtt.Message) error { - msg := bindings.ReadResponse{Data: mqttMsg.Payload()} + msg := bindings.ReadResponse{ + Data: mqttMsg.Payload(), + Metadata: map[string]string{mqttTopic: mqttMsg.Topic()}, + } // paho.mqtt.golang requires that handlers never block or it can deadlock on client.Disconnect. // To ensure that the Dapr runtime does not hang on teardown on of the component, run the app's diff --git a/bindings/mqtt/mqtt_integration_test.go b/bindings/mqtt/mqtt_integration_test.go new file mode 100644 index 000000000..5bf007285 --- /dev/null +++ b/bindings/mqtt/mqtt_integration_test.go @@ -0,0 +1,114 @@ +package mqtt + +import ( + "context" + "os" + "testing" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/kit/logger" +) + +const ( + // Environment variable containing the host name for MQTT integration tests + // To run using docker: + // Create mosquitto.conf with content: + // listener 1883 + // allow_anonymous true + // And run: + // nolint:misspell + // docker run -d -v mosquitto.conf:/mosquitto/config/mosquitto.conf --name test-mqtt -p 1883:1883 eclipse-mosquitto:2 + // In that case the connection string will be: tcp://127.0.0.1:1883 + testMQTTConnectionStringEnvKey = "DAPR_TEST_MQTT_URL" +) + +func getConnectionString() string { + return os.Getenv(testMQTTConnectionStringEnvKey) +} + +func TestInvokeWithTopic(t *testing.T) { + t.Parallel() + + url := getConnectionString() + if url == "" { + t.Skipf("MQTT connection string configuration must be set in environment variable '%s' (example 'tcp://localhost:1883')", testMQTTConnectionStringEnvKey) + } + + topicDefault := "/app/default" + const msgDefault = "hello from default" + dataDefault := []byte(msgDefault) + + topicCustomized := "/app/customized" + const msgCustomized = "hello from customized" + dataCustomized := []byte(msgCustomized) + + metadata := bindings.Metadata{ + Name: "testQueue", + Properties: map[string]string{ + "consumerID": uuid.NewString(), + "url": url, + "topic": topicDefault, + "qos": "1", + "retain": "false", + "cleanSession": "true", + "backOffMaxRetries": "0", + }, + } + + logger := logger.NewLogger("test") + + r := NewMQTT(logger) + err := r.Init(metadata) + assert.Nil(t, err) + + conn, err := r.connect(uuid.NewString()) + assert.Nil(t, err) + defer conn.Disconnect(1) + + msgCh := make(chan interface{}) + defer close(msgCh) + + token := conn.Subscribe("/app/#", 1, func(client mqtt.Client, mqttMsg mqtt.Message) { + msgCh <- mqttMsg + }) + ok := token.WaitTimeout(2 * time.Second) + assert.True(t, ok, "subscribe to /app/# timeout") + err = token.Error() + assert.Nil(t, err, "error subscribe to test topic") + + // Timeout in case message transfer error. + go func() { + time.Sleep(5 * time.Second) + msgCh <- "timeout" + }() + + // Test invoke with default topic configured for component. + _, err = r.Invoke(context.TODO(), &bindings.InvokeRequest{Data: dataDefault}) + assert.Nil(t, err) + + m := <-msgCh + mqttMessage, ok := m.(mqtt.Message) + assert.True(t, ok) + assert.Equal(t, dataDefault, mqttMessage.Payload()) + assert.Equal(t, topicDefault, mqttMessage.Topic()) + + // Test invoke with customized topic. + _, err = r.Invoke(context.TODO(), &bindings.InvokeRequest{ + Data: dataCustomized, + Metadata: map[string]string{ + mqttTopic: topicCustomized, + }, + }) + assert.Nil(t, err) + + m = <-msgCh + mqttMessage, ok = m.(mqtt.Message) + assert.True(t, ok) + assert.Equal(t, dataCustomized, mqttMessage.Payload()) + assert.Equal(t, topicCustomized, mqttMessage.Topic()) +} diff --git a/bindings/mqtt/mqtt_test.go b/bindings/mqtt/mqtt_test.go index 5f545a046..1c48c63a5 100644 --- a/bindings/mqtt/mqtt_test.go +++ b/bindings/mqtt/mqtt_test.go @@ -14,6 +14,7 @@ limitations under the License. package mqtt import ( + "context" "crypto/x509" "encoding/pem" "errors" @@ -22,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/dapr/components-contrib/bindings" + "github.com/dapr/kit/logger" ) func getFakeProperties() map[string]string { @@ -187,4 +189,74 @@ func TestParseMetadata(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, m.tlsCfg.clientKey, "failed to parse valid client certificate key") }) + + t.Run("Response returns the topic that the subscribed data is from.", func(t *testing.T) { + const msg = "hello world" + + payload := []byte(msg) + topic := "/topic/where/the/data/is/from" + + logger := logger.NewLogger("test") + m := NewMQTT(logger) + m.ctx, m.cancel = context.WithCancel(context.Background()) + + m.handleMessage(func(ctx context.Context, r *bindings.ReadResponse) ([]byte, error) { + assert.Equal(t, payload, r.Data) + metadata := r.Metadata + responseTopic, ok := metadata[mqttTopic] + assert.True(t, ok) + assert.Equal(t, topic, responseTopic) + return r.Data, nil + }, &mqttMockMessage{ + topic: topic, + payload: payload, + }) + }) +} + +type mqttMockMessage struct { + duplicate bool + qos byte + retained bool + topic string + mqttMockMessageID uint16 + payload []byte + ackCalled bool + noautoack bool +} + +func (m *mqttMockMessage) Duplicate() bool { + return m.duplicate +} + +func (m *mqttMockMessage) Qos() byte { + return m.qos +} + +func (m *mqttMockMessage) Retained() bool { + return m.retained +} + +func (m *mqttMockMessage) Topic() string { + return m.topic +} + +func (m *mqttMockMessage) MessageID() uint16 { + return m.mqttMockMessageID +} + +func (m *mqttMockMessage) Payload() []byte { + return m.payload +} + +func (m *mqttMockMessage) Ack() { + m.ackCalled = true +} + +func (m *mqttMockMessage) NoAutoAck() bool { + return m.noautoack +} + +func (m *mqttMockMessage) AutoAckOff() { + m.noautoack = true }