715 lines
22 KiB
Go
715 lines
22 KiB
Go
/*
|
|
Copyright 2023 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package mqtt
|
|
|
|
import (
|
|
"context"
|
|
"crypto/x509"
|
|
"encoding/pem"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"reflect"
|
|
"regexp"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
mdata "github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/pubsub"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
type mqttMessage struct {
|
|
data []byte
|
|
retained bool
|
|
topic string
|
|
qos byte
|
|
}
|
|
|
|
var _ mqtt.Message = (*mqttMessage)(nil)
|
|
|
|
func (m mqttMessage) Duplicate() bool {
|
|
return false
|
|
}
|
|
|
|
func (m mqttMessage) Qos() byte {
|
|
return m.qos
|
|
}
|
|
|
|
func (m mqttMessage) Retained() bool {
|
|
return m.retained
|
|
}
|
|
|
|
func (m mqttMessage) Topic() string {
|
|
return m.topic
|
|
}
|
|
|
|
func (m mqttMessage) MessageID() uint16 {
|
|
return uint16(rand.Intn(math.MaxUint16 + 1)) //nolint:gosec
|
|
}
|
|
|
|
func (m mqttMessage) Payload() []byte {
|
|
return m.data
|
|
}
|
|
|
|
func (m mqttMessage) Ack() {
|
|
// nop
|
|
}
|
|
|
|
type mockedMQTTToken struct {
|
|
m sync.RWMutex
|
|
complete chan struct{}
|
|
err error
|
|
}
|
|
|
|
var _ mqtt.Token = (*mockedMQTTToken)(nil)
|
|
|
|
func (t *mockedMQTTToken) Wait() bool {
|
|
<-t.complete
|
|
return true
|
|
}
|
|
|
|
func (t *mockedMQTTToken) WaitTimeout(d time.Duration) bool {
|
|
timer := time.NewTimer(d)
|
|
select {
|
|
case <-t.complete:
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
return true
|
|
case <-timer.C:
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (t *mockedMQTTToken) Done() <-chan struct{} {
|
|
return t.complete
|
|
}
|
|
|
|
func (t *mockedMQTTToken) flowComplete() {
|
|
select {
|
|
case <-t.complete:
|
|
default:
|
|
close(t.complete)
|
|
}
|
|
}
|
|
|
|
func (t *mockedMQTTToken) Error() error {
|
|
t.m.RLock()
|
|
defer t.m.RUnlock()
|
|
return t.err
|
|
}
|
|
|
|
type mockedMQTTClient struct {
|
|
msgCh chan mqttMessage
|
|
}
|
|
|
|
var _ mqtt.Client = (*mockedMQTTClient)(nil)
|
|
|
|
func newMockedMQTTClient(ch chan mqttMessage) *mockedMQTTClient {
|
|
return &mockedMQTTClient{
|
|
msgCh: ch,
|
|
}
|
|
}
|
|
|
|
func (m mockedMQTTClient) IsConnected() bool {
|
|
return true
|
|
}
|
|
|
|
func (m mockedMQTTClient) IsConnectionOpen() bool {
|
|
return true
|
|
}
|
|
|
|
func (m mockedMQTTClient) Connect() mqtt.Token {
|
|
token := &mockedMQTTToken{complete: make(chan struct{})}
|
|
token.flowComplete()
|
|
|
|
return token
|
|
}
|
|
|
|
func (m mockedMQTTClient) Disconnect(quiesce uint) {
|
|
// nop
|
|
}
|
|
|
|
func (m mockedMQTTClient) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token {
|
|
token := &mockedMQTTToken{complete: make(chan struct{})}
|
|
|
|
msg := mqttMessage{
|
|
data: payload.([]byte),
|
|
retained: retained,
|
|
topic: topic,
|
|
qos: qos,
|
|
}
|
|
m.msgCh <- msg
|
|
|
|
token.flowComplete()
|
|
|
|
return token
|
|
}
|
|
|
|
func (m mockedMQTTClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
|
|
token := &mockedMQTTToken{complete: make(chan struct{})}
|
|
token.flowComplete()
|
|
|
|
go func() {
|
|
for msg := range m.msgCh {
|
|
callback(m, msg)
|
|
}
|
|
}()
|
|
|
|
return token
|
|
}
|
|
|
|
func (m mockedMQTTClient) SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token {
|
|
token := &mockedMQTTToken{complete: make(chan struct{})}
|
|
token.flowComplete()
|
|
|
|
go func() {
|
|
for msg := range m.msgCh {
|
|
callback(m, msg)
|
|
}
|
|
}()
|
|
|
|
return token
|
|
}
|
|
|
|
func (m mockedMQTTClient) Unsubscribe(topics ...string) mqtt.Token {
|
|
token := &mockedMQTTToken{complete: make(chan struct{})}
|
|
token.flowComplete()
|
|
|
|
return token
|
|
}
|
|
|
|
func (m mockedMQTTClient) AddRoute(topic string, callback mqtt.MessageHandler) {
|
|
// nop
|
|
}
|
|
|
|
func (m mockedMQTTClient) OptionsReader() mqtt.ClientOptionsReader {
|
|
return mqtt.ClientOptionsReader{}
|
|
}
|
|
|
|
func getFakeProperties() map[string]string {
|
|
return map[string]string{
|
|
"consumerID": "client",
|
|
mqttURL: "tcp://fakeUser:fakePassword@fake.mqtt.host:1883",
|
|
mqttQOS: "1",
|
|
mqttRetain: "true",
|
|
mqttCleanSession: "false",
|
|
}
|
|
}
|
|
|
|
func TestParseMetadata(t *testing.T) {
|
|
log := logger.NewLogger("test")
|
|
t.Run("metadata is correct", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
|
|
m, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, fakeProperties[mqttURL], m.URL)
|
|
assert.Equal(t, byte(1), m.Qos)
|
|
assert.True(t, m.Retain)
|
|
assert.False(t, m.CleanSession)
|
|
})
|
|
|
|
t.Run("missing consumerID", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties["consumerID"] = ""
|
|
_, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
assert.Contains(t, err.Error(), "missing consumerID")
|
|
})
|
|
|
|
t.Run("url is not given", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
|
|
fakeMetaData := pubsub.Metadata{
|
|
Base: mdata.Base{Properties: fakeProperties},
|
|
}
|
|
fakeMetaData.Properties[mqttURL] = ""
|
|
|
|
m, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
require.ErrorContains(t, err, "missing url")
|
|
assert.Equal(t, fakeProperties[mqttURL], m.URL)
|
|
})
|
|
|
|
t.Run("qos and retain is not given", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
|
|
fakeMetaData := pubsub.Metadata{
|
|
Base: mdata.Base{Properties: fakeProperties},
|
|
}
|
|
delete(fakeMetaData.Properties, mqttQOS)
|
|
delete(fakeMetaData.Properties, mqttRetain)
|
|
|
|
m, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, fakeProperties[mqttURL], m.URL)
|
|
assert.Equal(t, byte(1), m.Qos)
|
|
assert.False(t, m.Retain)
|
|
})
|
|
|
|
t.Run("invalid ca certificate", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties[pubsub.CACert] = "randomNonPEMBlockCA"
|
|
_, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
assert.Contains(t, err.Error(), "invalid caCert")
|
|
})
|
|
|
|
t.Run("valid ca certificate", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties[pubsub.CACert] = "-----BEGIN CERTIFICATE-----\nMIICyDCCAbACCQDb8BtgvbqW5jANBgkqhkiG9w0BAQsFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1MzU4WhcNMjUw\nODEyMDY1MzU4WjAmMQswCQYDVQQGEwJJTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0\nQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEXte1GBxFJaygsEnK\nHV2AxazZW6Vppv+i50AuURHcaGo0i8G5CTfHzSKrYtTFfBskUspl+2N8GPV5c8Eb\ng+PP6YFn1wiHVz+wRSk3BD35DcGOT2o4XsJw5tiAzJkbpAOYCYl7KAM+BtOf41uC\nd6TdqmawhRGtv1ND2WtyJOT6A3KcUfjhL4TFEhWoljPJVay4TQoJcZMAImD/Xcxw\n6urv6wmUJby3/RJ3I46ZNH3zxEw5vSq1TuzuXxQmfPJG0ZPKJtQZ2nkZ3PNZe4bd\nNUa83YgQap7nBhYdYMMsQyLES2qy3mPcemBVoBWRGODel4PMEcsQiOhAyloAF2d3\nhd+LAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAK13X5JYBy78vHYoP0Oq9fe5XBbL\nuRM8YLnet9b/bXTGG4SnCCOGqWz99swYK7SVyR5l2h8SAoLzeNV61PtaZ6fHrbar\noxSL7BoRXOhMH6LQATadyvwlJ71uqlagqya7soaPK09TtfzeebLT0QkRCWT9b9lQ\nDBvBVCaFidynJL1ts21m5yUdIY4JSu4sGZGb4FRGFdBv/hD3wH8LAkOppsSv3C/Q\nkfkDDSQzYbdMoBuXmafvi3He7Rv+e6Tj9or1rrWdx0MIKlZPzz4DOe5Rh112uRB9\n7xPHJt16c+Ya3DKpchwwdNcki0vFchlpV96HK8sMCoY9kBzPhkEQLdiBGv4=\n-----END CERTIFICATE-----\n"
|
|
m, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
require.NoError(t, err)
|
|
block, _ := pem.Decode([]byte(m.TLSProperties.CACert))
|
|
cert, err := x509.ParseCertificate(block.Bytes)
|
|
if err != nil {
|
|
t.Errorf("failed to parse ca certificate from metadata. %v", err)
|
|
}
|
|
assert.Equal(t, "daprMqttTestCA", cert.Subject.CommonName)
|
|
})
|
|
|
|
t.Run("invalid client certificate", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties[pubsub.ClientCert] = "randomNonPEMBlockClientCert"
|
|
_, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
assert.Contains(t, err.Error(), "invalid clientCert")
|
|
})
|
|
|
|
t.Run("valid client certificate", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties[pubsub.ClientCert] = "-----BEGIN CERTIFICATE-----\nMIICzDCCAbQCCQDBKDMS3SHsDzANBgkqhkiG9w0BAQUFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1NTE1WhcNMjEw\nODA3MDY1NTE1WjAqMQswCQYDVQQGEwJJTjEbMBkGA1UEAwwSZGFwck1xdHRUZXN0\nQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5IDfsGI2pb4W\nt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu++M6yi9ABaBiYChpxbylqIeAn/HT\n3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsMfzO9p1oMpTnRdJCHYinE+oqVced5\nHI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPvui8WzJq9TMkEhEME+5hs3VKyKZr2\nqjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMWwzj2G+Rj8PMcBjoKeCLQL9uQh7f1\nTWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+Ur2b5+7/hoqnTzReJ3XUe1jM3l44f\nl0rOf4hu2QIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAT9yoIeX0LTsvx7/b+8V3a\nkP+j8u97QCc8n5xnMpivcMEk5cfqXX5Llv2EUJ9kBsynrJwT7ujhTJXSA/zb2UdC\nKH8PaSrgIlLwQNZMDofbz6+zPbjStkgne/ZQkTDIxY73sGpJL8LsQVO9p2KjOpdj\nSf9KuJhLzcHolh7ry3ZrkOg+QlMSvseeDRAxNhpkJrGQ6piXoUiEeKKNa0rWTMHx\nIP1Hqj+hh7jgqoQR48NL2jNng7I64HqTl6Mv2fiNfINiw+5xmXTB0QYkGU5NvPBO\naKcCRcGlU7ND89BogQPZsl/P04tAuQqpQWffzT4sEEOyWSVGda4N2Ys3GSQGBv8e\n-----END CERTIFICATE-----\n"
|
|
m, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
require.NoError(t, err)
|
|
block, _ := pem.Decode([]byte(m.TLSProperties.ClientCert))
|
|
cert, err := x509.ParseCertificate(block.Bytes)
|
|
if err != nil {
|
|
t.Errorf("failed to parse client certificate from metadata. %v", err)
|
|
}
|
|
assert.Equal(t, "daprMqttTestClient", cert.Subject.CommonName)
|
|
})
|
|
|
|
t.Run("invalid client certificate key", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties[pubsub.ClientKey] = "randomNonPEMBlockClientKey"
|
|
_, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
assert.Contains(t, err.Error(), "invalid clientKey")
|
|
})
|
|
|
|
t.Run("valid client certificate key", func(t *testing.T) {
|
|
fakeProperties := getFakeProperties()
|
|
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
|
|
fakeMetaData.Properties[pubsub.ClientKey] = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA5IDfsGI2pb4Wt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu\n++M6yi9ABaBiYChpxbylqIeAn/HT3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsM\nfzO9p1oMpTnRdJCHYinE+oqVced5HI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPv\nui8WzJq9TMkEhEME+5hs3VKyKZr2qjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMW\nwzj2G+Rj8PMcBjoKeCLQL9uQh7f1TWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+U\nr2b5+7/hoqnTzReJ3XUe1jM3l44fl0rOf4hu2QIDAQABAoIBAQCVMINb4TP20P55\n9IPyqlxjhPT563hijXK+lhMJyiBDPavOOs7qjLikq2bshYPVbm1o2jt6pkXXqAeB\n5t/d20fheQQurYyPfxecNBZuL78duwbcUy28m2aXLlcVRYO4zGhoMgdW4UajoNLV\nT/UIiDONWGyhTHXMHdP+6h9UOmvs3o4b225AuLrw9n6QO5I1Se8lcfOTIqR1fy4O\nGsUWEQPdW0X3Dhgpx7kDIuBTAQzbjD31PCR1U8h2wsCeEe6hPCrsMbo/D019weol\ndi40tbWR1/oNz0+vro2d9YDPJkXN0gmpT51Z4YJoexZBdyzO5z4DMSdn5yczzt6p\nQq8LsXAFAoGBAPYXRbC4OxhtuC+xr8KRkaCCMjtjUWFbFWf6OFgUS9b5uPz9xvdY\nXo7wBP1zp2dS8yFsdIYH5Six4Z5iOuDR4sVixzjabhwedL6bmS1zV5qcCWeASKX1\nURgSkfMmC4Tg3LBgZ9YxySFcVRjikxljkS3eK7Mp7Xmj5afe7qV73TJfAoGBAO20\nTtw2RGe02xnydZmmwf+NpQHOA9S0JsehZA6NRbtPEN/C8bPJIq4VABC5zcH+tfYf\nzndbDlGhuk+qpPA590rG5RSOUjYnQFq7njdSfFyok9dXSZQTjJwFnG2oy0LmgjCe\nROYnbCzD+a+gBKV4xlo2M80OLakQ3zOwPT0xNRnHAoGATLEj/tbrU8mdxP9TDwfe\nom7wyKFDE1wXZ7gLJyfsGqrog69y+lKH5XPXmkUYvpKTQq9SARMkz3HgJkPmpXnD\nelA2Vfl8pza2m1BShF+VxZErPR41hcLV6vKemXAZ1udc33qr4YzSaZskygSSYy8s\nZ2b9p3BBmc8CGzbWmKvpW3ECgYEAn7sFLxdMWj/+5221Nr4HKPn+wrq0ek9gq884\n1Ep8bETSOvrdvolPQ5mbBKJGsLC/h5eR/0Rx18sMzpIF6eOZ2GbU8z474mX36cCf\nrd9A8Gbbid3+9IE6gHGIz2uYwujw3UjNVbdyCpbahvjJhoQlDePUZVu8tRpAUpSA\nYklZvGsCgYBuIlOFTNGMVUnwfzrcS9a/31LSvWTZa8w2QFjsRPMYFezo2l4yWs4D\nPEpeuoJm+Gp6F6ayjoeyOw9mvMBH5hAZr4WjbiU6UodzEHREAsLAzCzcRyIpnDE6\nPW1c3j60r8AHVufkWTA+8B9WoLC5MqcYTV3beMGnNGGqS2PeBom63Q==\n-----END RSA PRIVATE KEY-----\n"
|
|
m, err := parseMQTTMetaData(fakeMetaData, log)
|
|
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.NotNil(t, m.TLSProperties.ClientKey, "failed to parse valid client certificate key")
|
|
})
|
|
}
|
|
|
|
func Test_buildRegexForTopic(t *testing.T) {
|
|
type args struct {
|
|
topicName string
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
args args
|
|
regex string
|
|
tryMatches map[string]bool
|
|
}{
|
|
{
|
|
name: "no wildcard",
|
|
args: args{topicName: "hello world"},
|
|
regex: "",
|
|
},
|
|
{
|
|
name: "#",
|
|
args: args{topicName: "#"},
|
|
regex: "^(.*)$",
|
|
tryMatches: map[string]bool{
|
|
"helloworld": true,
|
|
"helloworld/": true,
|
|
"helloworld/22": true,
|
|
"/helloworld": true,
|
|
"/helloworld/": true,
|
|
"/helloworld/22": true,
|
|
"Ei fu. Siccome immobile, dato il mortal sospiro, stette la spoglia immemore.": true,
|
|
"🐶": true,
|
|
"🐶/foo": true,
|
|
"🐶/foo/bar": true,
|
|
},
|
|
},
|
|
{
|
|
// This should be forbidden by the specs, but apparently it works in brokers
|
|
name: "#/foo",
|
|
args: args{topicName: "#/foo"},
|
|
regex: "^(.*)/foo$",
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"helloworld/": false,
|
|
"helloworld/22": false,
|
|
"helloworld/foo": true,
|
|
"hello/world/foo": true,
|
|
"helloworld/foo/bar": false,
|
|
"/helloworld": false,
|
|
"/helloworld/": false,
|
|
"/helloworld/22": false,
|
|
"/helloworld/foo": true,
|
|
"/hello/world/foo": true,
|
|
"/helloworld/foo/bar": false,
|
|
"🐶": false,
|
|
"🐶/foo": true,
|
|
"🐶/😄/foo": true,
|
|
"🐶/foo/bar": false,
|
|
"🐶/😄": false,
|
|
},
|
|
},
|
|
{
|
|
name: "+",
|
|
args: args{topicName: "+"},
|
|
regex: `^([^\/]*)$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": true,
|
|
"helloworld/": false,
|
|
"helloworld/22": false,
|
|
"/helloworld": false,
|
|
"/helloworld/": false,
|
|
"/helloworld/22": false,
|
|
"Ei fu. Siccome immobile, dato il mortal sospiro, stette la spoglia immemore.": true,
|
|
"🐶": true,
|
|
"🐶/foo": false,
|
|
"🐶/foo/bar": false,
|
|
},
|
|
},
|
|
{
|
|
name: "+/foo",
|
|
args: args{topicName: "+/foo"},
|
|
regex: `^([^\/]*)/foo$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"helloworld/": false,
|
|
"helloworld/22": false,
|
|
"helloworld/foo": true,
|
|
"hello/world/foo": false,
|
|
"helloworld/foo/bar": false,
|
|
"/helloworld": false,
|
|
"/helloworld/": false,
|
|
"/helloworld/22": false,
|
|
"/helloworld/foo": false,
|
|
"/hello/world/foo": false,
|
|
"/helloworld/foo/bar": false,
|
|
"🐶": false,
|
|
"🐶/foo": true,
|
|
"🐶/😄/foo": false,
|
|
"🐶/foo/bar": false,
|
|
"🐶/😄": false,
|
|
},
|
|
},
|
|
{
|
|
name: "foo# (invalid)",
|
|
args: args{topicName: "foo#"},
|
|
regex: "",
|
|
},
|
|
{
|
|
name: "foo+ (invalid)",
|
|
args: args{topicName: "foo+"},
|
|
regex: "",
|
|
},
|
|
{
|
|
name: "foo/#",
|
|
args: args{topicName: "foo/#"},
|
|
regex: "^foo(.*)$",
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"foo": true,
|
|
"foo/": true,
|
|
"foo/bar": true,
|
|
"/helloworld": false,
|
|
"foo/helloworld": true,
|
|
"foo/hello/world": true,
|
|
"hello/world": false,
|
|
"🐶": false,
|
|
"foo/🐶": true,
|
|
"🐶/foo/bar": false,
|
|
"foo/🐶/bar": true,
|
|
},
|
|
},
|
|
{
|
|
// This should be forbidden by the specs, but apparently it works in brokers
|
|
name: "foo/#/bar",
|
|
args: args{topicName: "foo/#/bar"},
|
|
regex: "^foo/(.*)/bar$",
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"foo/": false,
|
|
"foo/bar": false,
|
|
"foo/hi/bar": true,
|
|
"foo/hi/hi/hi/bar": true,
|
|
"foo/hi/world": false,
|
|
},
|
|
},
|
|
{
|
|
name: "foo/+",
|
|
args: args{topicName: "foo/+"},
|
|
regex: `^foo((\/|)[^\/]*)$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"foo": true,
|
|
"foo/": true,
|
|
"foo/bar": true,
|
|
"/helloworld": false,
|
|
"foo/helloworld": true,
|
|
"foo/hello/world": false,
|
|
"hello/world": false,
|
|
"🐶": false,
|
|
"foo/🐶": true,
|
|
"🐶/foo/bar": false,
|
|
"foo/🐶/bar": false,
|
|
},
|
|
},
|
|
{
|
|
name: "foo/+/bar",
|
|
args: args{topicName: "foo/+/bar"},
|
|
regex: `^foo/([^\/]*)/bar$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"foo/": false,
|
|
"foo/bar": false,
|
|
"foo/hi/bar": true,
|
|
"foo/hi/hi/hi/bar": false,
|
|
"foo/hi/world": false,
|
|
},
|
|
},
|
|
{
|
|
// https://github.com/dapr/components-contrib/issues/1881#issuecomment-1191571216
|
|
name: "event/data/+/+/+/1/1",
|
|
args: args{topicName: "event/data/+/+/+/1/1"},
|
|
regex: `^event/data/([^\/]*)/([^\/]*)/([^\/]*)/1/1$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"event/data": false,
|
|
"event/data/a/b/c/1/1": true,
|
|
"event/data/a/b/c/1/2": false,
|
|
"event/data/a/b/1/1": false,
|
|
"event/data/a/bbb/ccc/1/1": true,
|
|
},
|
|
},
|
|
{
|
|
name: "+/+/+/1/1",
|
|
args: args{topicName: "+/+/+/1/1"},
|
|
regex: `^([^\/]*)/([^\/]*)/([^\/]*)/1/1$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"a/b/c/": false,
|
|
"a/b/c/1": false,
|
|
"a/b/c/1/1": true,
|
|
"a/b/c/1/2": false,
|
|
"a/b/1/1": false,
|
|
"a/bbb/ccc/1/1": true,
|
|
},
|
|
},
|
|
{
|
|
name: "+/#/1/1",
|
|
args: args{topicName: "+/#/1/1"},
|
|
regex: `^([^\/]*)/(.*)/1/1$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"a/b/c/": false,
|
|
"a/b/c/1": false,
|
|
"a/b/c/1/1": true,
|
|
"a/b/c/1/2": false,
|
|
"a/b/1/1": true,
|
|
"a/bbb/ccc/1/1": true,
|
|
"aa/bbb/ccc/ddd/1/1": true,
|
|
},
|
|
},
|
|
{
|
|
name: "foo/+/bar/+",
|
|
args: args{topicName: "foo/+/bar/+"},
|
|
regex: `^foo/([^\/]*)/bar((\/|)[^\/]*)$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"foo/": false,
|
|
"foo/bar": false,
|
|
"foo/hi/bar": true,
|
|
"foo/hi/bar/foo": true,
|
|
"foo/hi/bar/foo/hi": false,
|
|
"foo/hi/bar/foo/bar": false,
|
|
"foo/hi/hi/hi/bar": false,
|
|
"foo/hi/world": false,
|
|
},
|
|
},
|
|
{
|
|
name: "foo/#/bar/+",
|
|
args: args{topicName: "foo/#/bar/+"},
|
|
regex: `^foo/(.*)/bar((\/|)[^\/]*)$`,
|
|
tryMatches: map[string]bool{
|
|
"helloworld": false,
|
|
"foo/": false,
|
|
"foo/bar": false,
|
|
"foo/hi/bar": true,
|
|
"foo/hi/bar/foo": true,
|
|
"foo/h/i/bar/foo": true,
|
|
"foo/h/i/0/bar/foo": true,
|
|
"foo/hi/bar/foo/hi": false,
|
|
"foo/hi/bar/foo/bar": true,
|
|
"foo/h/i/bar/foo/bar": true,
|
|
"foo/hi/hi/hi/bar": true,
|
|
"foo/hi/world": false,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got := buildRegexForTopic(tt.args.topicName)
|
|
if got != tt.regex {
|
|
t.Errorf("buildRegexForTopic(%v) = %v, want %v", tt.args.topicName, got, tt.regex)
|
|
return
|
|
}
|
|
if len(tt.tryMatches) > 0 {
|
|
re := regexp.MustCompile(got)
|
|
for topic, match := range tt.tryMatches {
|
|
if matched := re.MatchString(topic); matched != match {
|
|
t.Errorf("buildRegexForTopic(%v) - match(%v) returned %v but expected %v", tt.args.topicName, topic, matched, match)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_mqttPubSub_Publish(t *testing.T) {
|
|
type fields struct {
|
|
logger logger.Logger
|
|
metadata *mqttMetadata
|
|
ctx context.Context
|
|
}
|
|
type args struct {
|
|
req *pubsub.PublishRequest
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
wantErr assert.ErrorAssertionFunc
|
|
wantedMsg mqttMessage
|
|
}{
|
|
{
|
|
name: "publish request does not contain retain metadata",
|
|
fields: fields{
|
|
logger: logger.NewLogger("mqtt-test"),
|
|
ctx: t.Context(),
|
|
metadata: &mqttMetadata{
|
|
Retain: true,
|
|
},
|
|
},
|
|
args: args{
|
|
req: &pubsub.PublishRequest{
|
|
Data: []byte("test"),
|
|
PubsubName: "mqtt",
|
|
Metadata: map[string]string{},
|
|
Topic: "test",
|
|
ContentType: nil,
|
|
},
|
|
},
|
|
wantErr: assert.NoError,
|
|
wantedMsg: mqttMessage{
|
|
data: []byte("test"),
|
|
retained: true,
|
|
topic: "test",
|
|
qos: 0,
|
|
},
|
|
},
|
|
{
|
|
name: "publish request contains retain metadata",
|
|
fields: fields{
|
|
logger: logger.NewLogger("mqtt-test"),
|
|
ctx: t.Context(),
|
|
metadata: &mqttMetadata{
|
|
Retain: true,
|
|
},
|
|
},
|
|
args: args{
|
|
req: &pubsub.PublishRequest{
|
|
Data: []byte("test"),
|
|
PubsubName: "mqtt",
|
|
Metadata: map[string]string{"retain": "false"},
|
|
Topic: "test",
|
|
ContentType: nil,
|
|
},
|
|
},
|
|
wantErr: assert.NoError,
|
|
wantedMsg: mqttMessage{
|
|
data: []byte("test"),
|
|
retained: false,
|
|
topic: "test",
|
|
qos: 0,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
msgCh := make(chan mqttMessage, 1)
|
|
|
|
m := &mqttPubSub{
|
|
conn: newMockedMQTTClient(msgCh),
|
|
logger: tt.fields.logger,
|
|
metadata: tt.fields.metadata,
|
|
}
|
|
|
|
ctx := t.Context()
|
|
tt.wantErr(t, m.Publish(ctx, tt.args.req), fmt.Sprintf("Publish(%v, %v)", ctx, tt.args.req))
|
|
close(msgCh)
|
|
|
|
for msg := range msgCh {
|
|
if !reflect.DeepEqual(msg, tt.wantedMsg) {
|
|
t.Errorf("received different message than expected, got = %v, want %v", m, tt.wantedMsg)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|