Add topic metadata for mqtt input binding and support user defined topic for mqtt output binding (#1674)
* feat(bindings/mqtt): add data incoming topic to metadata Signed-off-by: lotuc <lotu.c@outlook.com> * feat(bindings/mqtt): support user defined topic on create action Signed-off-by: lotuc <lotu.c@outlook.com> * chore(bindings/mqtt): add integration test and topic response check test Signed-off-by: lotuc <lotu.c@outlook.com> * fix(bindings/mqtt): ignore misspell linting error for word mosquitto Signed-off-by: lotuc <lotu.c@outlook.com> Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
parent
8b5554dc71
commit
5e4bd89afd
|
@ -204,8 +204,13 @@ func (m *MQTT) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindin
|
||||||
bo = backoff.WithContext(bo, ctx)
|
bo = backoff.WithContext(bo, ctx)
|
||||||
|
|
||||||
return nil, retry.NotifyRecover(func() error {
|
return nil, retry.NotifyRecover(func() error {
|
||||||
m.logger.Debugf("mqtt publishing topic %s with data: %v", m.metadata.topic, req.Data)
|
topic, ok := req.Metadata[mqttTopic]
|
||||||
token := m.producer.Publish(m.metadata.topic, m.metadata.qos, m.metadata.retain, req.Data)
|
if !ok || topic == "" {
|
||||||
|
// If user does not specify a topic, publish via the component's default topic.
|
||||||
|
topic = m.metadata.topic
|
||||||
|
}
|
||||||
|
m.logger.Debugf("mqtt publishing topic %s with data: %v", topic, req.Data)
|
||||||
|
token := m.producer.Publish(topic, m.metadata.qos, m.metadata.retain, req.Data)
|
||||||
if !token.WaitTimeout(defaultWait) || token.Error() != nil {
|
if !token.WaitTimeout(defaultWait) || token.Error() != nil {
|
||||||
return fmt.Errorf("mqtt error from publish: %v", token.Error())
|
return fmt.Errorf("mqtt error from publish: %v", token.Error())
|
||||||
}
|
}
|
||||||
|
@ -218,7 +223,10 @@ func (m *MQTT) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTT) handleMessage(handler func(context.Context, *bindings.ReadResponse) ([]byte, error), mqttMsg mqtt.Message) error {
|
func (m *MQTT) handleMessage(handler func(context.Context, *bindings.ReadResponse) ([]byte, error), mqttMsg mqtt.Message) error {
|
||||||
msg := bindings.ReadResponse{Data: mqttMsg.Payload()}
|
msg := bindings.ReadResponse{
|
||||||
|
Data: mqttMsg.Payload(),
|
||||||
|
Metadata: map[string]string{mqttTopic: mqttMsg.Topic()},
|
||||||
|
}
|
||||||
|
|
||||||
// paho.mqtt.golang requires that handlers never block or it can deadlock on client.Disconnect.
|
// paho.mqtt.golang requires that handlers never block or it can deadlock on client.Disconnect.
|
||||||
// To ensure that the Dapr runtime does not hang on teardown on of the component, run the app's
|
// To ensure that the Dapr runtime does not hang on teardown on of the component, run the app's
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
package mqtt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/dapr/components-contrib/bindings"
|
||||||
|
"github.com/dapr/kit/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Environment variable containing the host name for MQTT integration tests
|
||||||
|
// To run using docker:
|
||||||
|
// Create mosquitto.conf with content:
|
||||||
|
// listener 1883
|
||||||
|
// allow_anonymous true
|
||||||
|
// And run:
|
||||||
|
// nolint:misspell
|
||||||
|
// docker run -d -v mosquitto.conf:/mosquitto/config/mosquitto.conf --name test-mqtt -p 1883:1883 eclipse-mosquitto:2
|
||||||
|
// In that case the connection string will be: tcp://127.0.0.1:1883
|
||||||
|
testMQTTConnectionStringEnvKey = "DAPR_TEST_MQTT_URL"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getConnectionString() string {
|
||||||
|
return os.Getenv(testMQTTConnectionStringEnvKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInvokeWithTopic(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
url := getConnectionString()
|
||||||
|
if url == "" {
|
||||||
|
t.Skipf("MQTT connection string configuration must be set in environment variable '%s' (example 'tcp://localhost:1883')", testMQTTConnectionStringEnvKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
topicDefault := "/app/default"
|
||||||
|
const msgDefault = "hello from default"
|
||||||
|
dataDefault := []byte(msgDefault)
|
||||||
|
|
||||||
|
topicCustomized := "/app/customized"
|
||||||
|
const msgCustomized = "hello from customized"
|
||||||
|
dataCustomized := []byte(msgCustomized)
|
||||||
|
|
||||||
|
metadata := bindings.Metadata{
|
||||||
|
Name: "testQueue",
|
||||||
|
Properties: map[string]string{
|
||||||
|
"consumerID": uuid.NewString(),
|
||||||
|
"url": url,
|
||||||
|
"topic": topicDefault,
|
||||||
|
"qos": "1",
|
||||||
|
"retain": "false",
|
||||||
|
"cleanSession": "true",
|
||||||
|
"backOffMaxRetries": "0",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := logger.NewLogger("test")
|
||||||
|
|
||||||
|
r := NewMQTT(logger)
|
||||||
|
err := r.Init(metadata)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
conn, err := r.connect(uuid.NewString())
|
||||||
|
assert.Nil(t, err)
|
||||||
|
defer conn.Disconnect(1)
|
||||||
|
|
||||||
|
msgCh := make(chan interface{})
|
||||||
|
defer close(msgCh)
|
||||||
|
|
||||||
|
token := conn.Subscribe("/app/#", 1, func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||||
|
msgCh <- mqttMsg
|
||||||
|
})
|
||||||
|
ok := token.WaitTimeout(2 * time.Second)
|
||||||
|
assert.True(t, ok, "subscribe to /app/# timeout")
|
||||||
|
err = token.Error()
|
||||||
|
assert.Nil(t, err, "error subscribe to test topic")
|
||||||
|
|
||||||
|
// Timeout in case message transfer error.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
msgCh <- "timeout"
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Test invoke with default topic configured for component.
|
||||||
|
_, err = r.Invoke(context.TODO(), &bindings.InvokeRequest{Data: dataDefault})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
m := <-msgCh
|
||||||
|
mqttMessage, ok := m.(mqtt.Message)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, dataDefault, mqttMessage.Payload())
|
||||||
|
assert.Equal(t, topicDefault, mqttMessage.Topic())
|
||||||
|
|
||||||
|
// Test invoke with customized topic.
|
||||||
|
_, err = r.Invoke(context.TODO(), &bindings.InvokeRequest{
|
||||||
|
Data: dataCustomized,
|
||||||
|
Metadata: map[string]string{
|
||||||
|
mqttTopic: topicCustomized,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
m = <-msgCh
|
||||||
|
mqttMessage, ok = m.(mqtt.Message)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, dataCustomized, mqttMessage.Payload())
|
||||||
|
assert.Equal(t, topicCustomized, mqttMessage.Topic())
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
||||||
package mqtt
|
package mqtt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -22,6 +23,7 @@ import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/bindings"
|
"github.com/dapr/components-contrib/bindings"
|
||||||
|
"github.com/dapr/kit/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getFakeProperties() map[string]string {
|
func getFakeProperties() map[string]string {
|
||||||
|
@ -187,4 +189,74 @@ func TestParseMetadata(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, m.tlsCfg.clientKey, "failed to parse valid client certificate key")
|
assert.NotNil(t, m.tlsCfg.clientKey, "failed to parse valid client certificate key")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("Response returns the topic that the subscribed data is from.", func(t *testing.T) {
|
||||||
|
const msg = "hello world"
|
||||||
|
|
||||||
|
payload := []byte(msg)
|
||||||
|
topic := "/topic/where/the/data/is/from"
|
||||||
|
|
||||||
|
logger := logger.NewLogger("test")
|
||||||
|
m := NewMQTT(logger)
|
||||||
|
m.ctx, m.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
m.handleMessage(func(ctx context.Context, r *bindings.ReadResponse) ([]byte, error) {
|
||||||
|
assert.Equal(t, payload, r.Data)
|
||||||
|
metadata := r.Metadata
|
||||||
|
responseTopic, ok := metadata[mqttTopic]
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, topic, responseTopic)
|
||||||
|
return r.Data, nil
|
||||||
|
}, &mqttMockMessage{
|
||||||
|
topic: topic,
|
||||||
|
payload: payload,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type mqttMockMessage struct {
|
||||||
|
duplicate bool
|
||||||
|
qos byte
|
||||||
|
retained bool
|
||||||
|
topic string
|
||||||
|
mqttMockMessageID uint16
|
||||||
|
payload []byte
|
||||||
|
ackCalled bool
|
||||||
|
noautoack bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) Duplicate() bool {
|
||||||
|
return m.duplicate
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) Qos() byte {
|
||||||
|
return m.qos
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) Retained() bool {
|
||||||
|
return m.retained
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) Topic() string {
|
||||||
|
return m.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) MessageID() uint16 {
|
||||||
|
return m.mqttMockMessageID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) Payload() []byte {
|
||||||
|
return m.payload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) Ack() {
|
||||||
|
m.ackCalled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) NoAutoAck() bool {
|
||||||
|
return m.noautoack
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mqttMockMessage) AutoAckOff() {
|
||||||
|
m.noautoack = true
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue