pubsub.mqtt: re-enqueue messages when they fail
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
be2fed6b3a
commit
0c40dccd07
|
|
@ -2,7 +2,7 @@ pid_file /mosquitto/mosquitto.pid
|
|||
|
||||
per_listener_settings false
|
||||
max_keepalive 65535
|
||||
port 1883
|
||||
listener 1883 0.0.0.0
|
||||
max_connections -1
|
||||
|
||||
allow_anonymous true
|
||||
|
|
|
|||
|
|
@ -13,13 +13,23 @@ limitations under the License.
|
|||
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
type metadata struct {
|
||||
tlsCfg
|
||||
url string
|
||||
clientID string
|
||||
qos byte
|
||||
retain bool
|
||||
cleanSession bool
|
||||
url string
|
||||
clientID string
|
||||
qos byte
|
||||
retain bool
|
||||
cleanSession bool
|
||||
maxRetriableErrorsPerSec int
|
||||
}
|
||||
|
||||
type tlsCfg struct {
|
||||
|
|
@ -27,3 +37,104 @@ type tlsCfg struct {
|
|||
clientCert string
|
||||
clientKey string
|
||||
}
|
||||
|
||||
const (
|
||||
// Keys
|
||||
mqttURL = "url"
|
||||
mqttQOS = "qos"
|
||||
mqttRetain = "retain"
|
||||
mqttClientID = "consumerID"
|
||||
mqttCleanSession = "cleanSession"
|
||||
mqttCACert = "caCert"
|
||||
mqttClientCert = "clientCert"
|
||||
mqttClientKey = "clientKey"
|
||||
mqttMaxRetriableErrorsPerSec = "maxRetriableErrorsPerSec"
|
||||
|
||||
// Defaults
|
||||
defaultQOS = 1
|
||||
defaultRetain = false
|
||||
defaultWait = 30 * time.Second
|
||||
defaultCleanSession = false
|
||||
defaultMaxRetriableErrorsPerSec = 10
|
||||
)
|
||||
|
||||
func parseMQTTMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) {
|
||||
m := metadata{}
|
||||
|
||||
// required configuration settings
|
||||
if val, ok := md.Properties[mqttURL]; ok && val != "" {
|
||||
m.url = val
|
||||
} else {
|
||||
return &m, fmt.Errorf("%s missing url", errorMsgPrefix)
|
||||
}
|
||||
|
||||
// optional configuration settings
|
||||
m.qos = defaultQOS
|
||||
if val, ok := md.Properties[mqttQOS]; ok && val != "" {
|
||||
qosInt, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid qos %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
m.qos = byte(qosInt)
|
||||
}
|
||||
|
||||
m.retain = defaultRetain
|
||||
if val, ok := md.Properties[mqttRetain]; ok && val != "" {
|
||||
var err error
|
||||
m.retain, err = strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid retain %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := md.Properties[mqttClientID]; ok && val != "" {
|
||||
m.clientID = val
|
||||
} else {
|
||||
return &m, fmt.Errorf("%s missing consumerID", errorMsgPrefix)
|
||||
}
|
||||
|
||||
m.cleanSession = defaultCleanSession
|
||||
if val, ok := md.Properties[mqttCleanSession]; ok && val != "" {
|
||||
var err error
|
||||
m.cleanSession, err = strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid cleanSession %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
m.maxRetriableErrorsPerSec = defaultMaxRetriableErrorsPerSec
|
||||
if val, ok := md.Properties[mqttMaxRetriableErrorsPerSec]; ok && val != "" {
|
||||
var err error
|
||||
m.maxRetriableErrorsPerSec, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid maxRetriableErrorsPerSec %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := md.Properties[mqttCACert]; ok && val != "" {
|
||||
if !isValidPEM(val) {
|
||||
return &m, fmt.Errorf("%s invalid caCert", errorMsgPrefix)
|
||||
}
|
||||
m.tlsCfg.caCert = val
|
||||
}
|
||||
if val, ok := md.Properties[mqttClientCert]; ok && val != "" {
|
||||
if !isValidPEM(val) {
|
||||
return &m, fmt.Errorf("%s invalid clientCert", errorMsgPrefix)
|
||||
}
|
||||
m.tlsCfg.clientCert = val
|
||||
}
|
||||
if val, ok := md.Properties[mqttClientKey]; ok && val != "" {
|
||||
if !isValidPEM(val) {
|
||||
return &m, fmt.Errorf("%s invalid clientKey", errorMsgPrefix)
|
||||
}
|
||||
m.tlsCfg.clientKey = val
|
||||
}
|
||||
|
||||
// Deprecated config option
|
||||
// TODO: Remove in the future
|
||||
if _, ok := md.Properties["backOffMaxRetries"]; ok {
|
||||
log.Warnf("Metadata property 'backOffMaxRetries' for component pubsub.mqtt has been deprecated and will be ignored. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-mqtt/")
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,47 +20,32 @@ import (
|
|||
"encoding/pem"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"go.uber.org/ratelimit"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// Keys.
|
||||
mqttURL = "url"
|
||||
mqttQOS = "qos"
|
||||
mqttRetain = "retain"
|
||||
mqttClientID = "consumerID"
|
||||
mqttCleanSession = "cleanSession"
|
||||
mqttCACert = "caCert"
|
||||
mqttClientCert = "clientCert"
|
||||
mqttClientKey = "clientKey"
|
||||
|
||||
// errors.
|
||||
errorMsgPrefix = "mqtt pub sub error:"
|
||||
|
||||
// Defaults.
|
||||
defaultQOS = 1
|
||||
defaultRetain = false
|
||||
defaultWait = 30 * time.Second
|
||||
defaultCleanSession = false
|
||||
)
|
||||
|
||||
// mqttPubSub type allows sending and receiving data to/from MQTT broker.
|
||||
type mqttPubSub struct {
|
||||
producer mqtt.Client
|
||||
consumer mqtt.Client
|
||||
metadata *metadata
|
||||
logger logger.Logger
|
||||
topics map[string]pubsub.Handler
|
||||
subscribingLock sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
producer mqtt.Client
|
||||
consumer mqtt.Client
|
||||
metadata *metadata
|
||||
logger logger.Logger
|
||||
topics map[string]pubsub.Handler
|
||||
retriableErrLimit ratelimit.Limiter
|
||||
subscribingLock sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewMQTTPubSub returns a new mqttPubSub instance.
|
||||
|
|
@ -78,86 +63,20 @@ func isValidPEM(val string) bool {
|
|||
return block != nil
|
||||
}
|
||||
|
||||
func (p *mqttPubSub) parseMQTTMetaData(md pubsub.Metadata) (*metadata, error) {
|
||||
m := metadata{}
|
||||
|
||||
// required configuration settings
|
||||
if val, ok := md.Properties[mqttURL]; ok && val != "" {
|
||||
m.url = val
|
||||
} else {
|
||||
return &m, fmt.Errorf("%s missing url", errorMsgPrefix)
|
||||
}
|
||||
|
||||
// optional configuration settings
|
||||
m.qos = defaultQOS
|
||||
if val, ok := md.Properties[mqttQOS]; ok && val != "" {
|
||||
qosInt, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid qos %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
m.qos = byte(qosInt)
|
||||
}
|
||||
|
||||
m.retain = defaultRetain
|
||||
if val, ok := md.Properties[mqttRetain]; ok && val != "" {
|
||||
var err error
|
||||
m.retain, err = strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid retain %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := md.Properties[mqttClientID]; ok && val != "" {
|
||||
m.clientID = val
|
||||
} else {
|
||||
return &m, fmt.Errorf("%s missing consumerID", errorMsgPrefix)
|
||||
}
|
||||
|
||||
m.cleanSession = defaultCleanSession
|
||||
if val, ok := md.Properties[mqttCleanSession]; ok && val != "" {
|
||||
var err error
|
||||
m.cleanSession, err = strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid clean session %s, %s", errorMsgPrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := md.Properties[mqttCACert]; ok && val != "" {
|
||||
if !isValidPEM(val) {
|
||||
return &m, fmt.Errorf("%s invalid ca certificate", errorMsgPrefix)
|
||||
}
|
||||
m.tlsCfg.caCert = val
|
||||
}
|
||||
if val, ok := md.Properties[mqttClientCert]; ok && val != "" {
|
||||
if !isValidPEM(val) {
|
||||
return &m, fmt.Errorf("%s invalid client certificate", errorMsgPrefix)
|
||||
}
|
||||
m.tlsCfg.clientCert = val
|
||||
}
|
||||
if val, ok := md.Properties[mqttClientKey]; ok && val != "" {
|
||||
if !isValidPEM(val) {
|
||||
return &m, fmt.Errorf("%s invalid client certificate key", errorMsgPrefix)
|
||||
}
|
||||
m.tlsCfg.clientKey = val
|
||||
}
|
||||
|
||||
// Deprecated config option
|
||||
// TODO: Remove in the future
|
||||
if _, ok := md.Properties["backOffMaxRetries"]; ok {
|
||||
p.logger.Warnf("Metadata property 'backOffMaxRetries' for component pubsub.mqtt has been deprecated and will be ignored. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-mqtt/")
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
// Init parses metadata and creates a new Pub Sub client.
|
||||
func (m *mqttPubSub) Init(metadata pubsub.Metadata) error {
|
||||
mqttMeta, err := m.parseMQTTMetaData(metadata)
|
||||
mqttMeta, err := parseMQTTMetaData(metadata, m.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.metadata = mqttMeta
|
||||
|
||||
if m.metadata.maxRetriableErrorsPerSec > 0 {
|
||||
m.retriableErrLimit = ratelimit.New(m.metadata.maxRetriableErrorsPerSec)
|
||||
} else {
|
||||
m.retriableErrLimit = ratelimit.NewUnlimited()
|
||||
}
|
||||
|
||||
m.ctx, m.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// mqtt broker allows only one connection at a given time from a clientID.
|
||||
|
|
@ -216,12 +135,13 @@ func (m *mqttPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest,
|
|||
m.subscribingLock.Lock()
|
||||
defer m.subscribingLock.Unlock()
|
||||
|
||||
// Start the subscription
|
||||
// When the connection is ready, add the topic
|
||||
// Reset subscription if active
|
||||
m.resetSubscription()
|
||||
|
||||
// Add the topic then start the subscription
|
||||
m.topics[req.Topic] = handler
|
||||
// Use the global context here to maintain the connection
|
||||
m.startSubscription(m.ctx, func() {
|
||||
m.topics[req.Topic] = handler
|
||||
})
|
||||
m.startSubscription(m.ctx)
|
||||
|
||||
// Listen for context cancelation to remove the subscription
|
||||
go func() {
|
||||
|
|
@ -234,31 +154,37 @@ func (m *mqttPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest,
|
|||
|
||||
// If this is the last subscription or if the global context is done, close the connection entirely
|
||||
if len(m.topics) <= 1 || m.ctx.Err() != nil {
|
||||
m.consumer.Disconnect(5)
|
||||
m.consumer = nil
|
||||
m.closeSubscription()
|
||||
delete(m.topics, req.Topic)
|
||||
return
|
||||
}
|
||||
|
||||
// Reconnect with one less topic
|
||||
m.startSubscription(m.ctx, func() {
|
||||
delete(m.topics, req.Topic)
|
||||
})
|
||||
m.resetSubscription()
|
||||
delete(m.topics, req.Topic)
|
||||
m.startSubscription(m.ctx)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mqttPubSub) startSubscription(ctx context.Context, onConnRready func()) error {
|
||||
// reset synchronization
|
||||
func (m *mqttPubSub) closeSubscription() {
|
||||
m.consumer.Disconnect(5)
|
||||
m.consumer = nil
|
||||
}
|
||||
|
||||
// resetSubscription closes the subscription if it's currently active
|
||||
func (m *mqttPubSub) resetSubscription() {
|
||||
if m.consumer != nil && m.consumer.IsConnectionOpen() {
|
||||
m.logger.Infof("re-initializing the subscriber")
|
||||
m.consumer.Disconnect(5)
|
||||
m.consumer = nil
|
||||
m.closeSubscription()
|
||||
} else {
|
||||
m.logger.Infof("initializing the subscriber")
|
||||
}
|
||||
}
|
||||
|
||||
// startSubscription connects to the server and begins receiving messages
|
||||
func (m *mqttPubSub) startSubscription(ctx context.Context) error {
|
||||
// mqtt broker allows only one connection at a given time from a clientID.
|
||||
consumerClientID := fmt.Sprintf("%s-consumer", m.metadata.clientID)
|
||||
connCtx, connCancel := context.WithTimeout(ctx, defaultWait)
|
||||
|
|
@ -269,9 +195,6 @@ func (m *mqttPubSub) startSubscription(ctx context.Context, onConnRready func())
|
|||
}
|
||||
m.consumer = c
|
||||
|
||||
// Invoke onConnReady so changes to the topics can be made safely
|
||||
onConnRready()
|
||||
|
||||
subscribeTopics := make(map[string]byte, len(m.topics))
|
||||
for k := range m.topics {
|
||||
subscribeTopics[k] = m.metadata.qos
|
||||
|
|
@ -299,7 +222,42 @@ func (m *mqttPubSub) startSubscription(ctx context.Context, onConnRready func())
|
|||
// onMessage returns the callback to be invoked when there's a new message from a topic
|
||||
func (m *mqttPubSub) onMessage(ctx context.Context) func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
return func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
// Turn off auto-ACK
|
||||
mqttMsg.AutoAckOff()
|
||||
|
||||
ack := false
|
||||
defer func() {
|
||||
// Do not send N/ACKs on retained messages
|
||||
if mqttMsg.Retained() {
|
||||
return
|
||||
}
|
||||
|
||||
// MQTT does not support NACK's, so in case of error we need to re-enqueue the message and then send a positive ACK for this message
|
||||
// Note that if the connection drops before the message is explicitly ACK'd below, then it's automatically re-sent (assuming QoS is 1 or greater, which is the default). So we do not risk losing messages.
|
||||
// Problem with this approach is that if the service crashes between the time the message is re-enqueued and when the ACK is sent, the message may be delivered twice
|
||||
if !ack {
|
||||
m.logger.Debugf("Re-publishing message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
publishErr := m.Publish(&pubsub.PublishRequest{
|
||||
Topic: mqttMsg.Topic(),
|
||||
Data: mqttMsg.Payload(),
|
||||
})
|
||||
if publishErr != nil {
|
||||
m.logger.Errorf("Failed to re-publish message %s/%d. Error: %v", mqttMsg.Topic(), mqttMsg.MessageID(), publishErr)
|
||||
// Return so Ack() isn't invoked
|
||||
return
|
||||
}
|
||||
}
|
||||
mqttMsg.Ack()
|
||||
|
||||
// If we re-published the message, consume a retriable error token
|
||||
if !ack {
|
||||
m.logger.Debugf("Taking a retriable error token")
|
||||
before := time.Now()
|
||||
_ = m.retriableErrLimit.Take()
|
||||
m.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before))
|
||||
}
|
||||
}()
|
||||
|
||||
msg := pubsub.NewMessage{
|
||||
Topic: mqttMsg.Topic(),
|
||||
Data: mqttMsg.Payload(),
|
||||
|
|
@ -311,14 +269,15 @@ func (m *mqttPubSub) onMessage(ctx context.Context) func(client mqtt.Client, mqt
|
|||
return
|
||||
}
|
||||
|
||||
m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
m.logger.Debugf("Processing MQTT message %s/%d (retained=%v)", mqttMsg.Topic(), mqttMsg.MessageID(), mqttMsg.Retained())
|
||||
err := topicHandler(ctx, &msg)
|
||||
if err != nil {
|
||||
m.logger.Errorf("Failed processing MQTT message %s/%d: %v", mqttMsg.Topic(), mqttMsg.MessageID(), err)
|
||||
return
|
||||
}
|
||||
|
||||
mqttMsg.Ack()
|
||||
m.logger.Debugf("Done processing MQTT message %s/%d; sending ACK", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
ack = true
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -329,6 +288,13 @@ func (m *mqttPubSub) connect(ctx context.Context, clientID string) (mqtt.Client,
|
|||
}
|
||||
opts := m.createClientOptions(uri, clientID)
|
||||
client := mqtt.NewClient(opts)
|
||||
|
||||
// Add all routes before we connect to catch messages that may be delivered before client.Subscribe is invoked
|
||||
// The routes will be overwritten later
|
||||
for topic := range m.topics {
|
||||
client.AddRoute(topic, m.onMessage(ctx))
|
||||
}
|
||||
|
||||
token := client.Connect()
|
||||
select {
|
||||
case <-token.Done():
|
||||
|
|
|
|||
|
|
@ -44,10 +44,7 @@ func TestParseMetadata(t *testing.T) {
|
|||
Properties: fakeProperties,
|
||||
}
|
||||
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -61,10 +58,7 @@ func TestParseMetadata(t *testing.T) {
|
|||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties["consumerID"] = ""
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
_, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
_, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.Contains(t, err.Error(), "missing consumerID")
|
||||
|
|
@ -78,10 +72,7 @@ func TestParseMetadata(t *testing.T) {
|
|||
}
|
||||
fakeMetaData.Properties[mqttURL] = ""
|
||||
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, errors.New("mqtt pub sub error: missing url").Error())
|
||||
|
|
@ -97,10 +88,7 @@ func TestParseMetadata(t *testing.T) {
|
|||
fakeMetaData.Properties[mqttQOS] = ""
|
||||
fakeMetaData.Properties[mqttRetain] = ""
|
||||
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -117,13 +105,10 @@ func TestParseMetadata(t *testing.T) {
|
|||
}
|
||||
fakeMetaData.Properties[mqttCleanSession] = "randomString"
|
||||
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.Contains(t, err.Error(), "invalid clean session")
|
||||
assert.Contains(t, err.Error(), "invalid cleanSession")
|
||||
assert.Equal(t, fakeProperties[mqttURL], m.url)
|
||||
})
|
||||
|
||||
|
|
@ -131,23 +116,17 @@ func TestParseMetadata(t *testing.T) {
|
|||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties[mqttCACert] = "randomNonPEMBlockCA"
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
_, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
_, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.Contains(t, err.Error(), "invalid ca certificate")
|
||||
assert.Contains(t, err.Error(), "invalid caCert")
|
||||
})
|
||||
|
||||
t.Run("valid ca certificate", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties[mqttCACert] = "-----BEGIN CERTIFICATE-----\nMIICyDCCAbACCQDb8BtgvbqW5jANBgkqhkiG9w0BAQsFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1MzU4WhcNMjUw\nODEyMDY1MzU4WjAmMQswCQYDVQQGEwJJTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0\nQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEXte1GBxFJaygsEnK\nHV2AxazZW6Vppv+i50AuURHcaGo0i8G5CTfHzSKrYtTFfBskUspl+2N8GPV5c8Eb\ng+PP6YFn1wiHVz+wRSk3BD35DcGOT2o4XsJw5tiAzJkbpAOYCYl7KAM+BtOf41uC\nd6TdqmawhRGtv1ND2WtyJOT6A3KcUfjhL4TFEhWoljPJVay4TQoJcZMAImD/Xcxw\n6urv6wmUJby3/RJ3I46ZNH3zxEw5vSq1TuzuXxQmfPJG0ZPKJtQZ2nkZ3PNZe4bd\nNUa83YgQap7nBhYdYMMsQyLES2qy3mPcemBVoBWRGODel4PMEcsQiOhAyloAF2d3\nhd+LAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAK13X5JYBy78vHYoP0Oq9fe5XBbL\nuRM8YLnet9b/bXTGG4SnCCOGqWz99swYK7SVyR5l2h8SAoLzeNV61PtaZ6fHrbar\noxSL7BoRXOhMH6LQATadyvwlJ71uqlagqya7soaPK09TtfzeebLT0QkRCWT9b9lQ\nDBvBVCaFidynJL1ts21m5yUdIY4JSu4sGZGb4FRGFdBv/hD3wH8LAkOppsSv3C/Q\nkfkDDSQzYbdMoBuXmafvi3He7Rv+e6Tj9or1rrWdx0MIKlZPzz4DOe5Rh112uRB9\n7xPHJt16c+Ya3DKpchwwdNcki0vFchlpV96HK8sMCoY9kBzPhkEQLdiBGv4=\n-----END CERTIFICATE-----\n"
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -163,23 +142,17 @@ func TestParseMetadata(t *testing.T) {
|
|||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties[mqttClientCert] = "randomNonPEMBlockClientCert"
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
_, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
_, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.Contains(t, err.Error(), "invalid client certificate")
|
||||
assert.Contains(t, err.Error(), "invalid clientCert")
|
||||
})
|
||||
|
||||
t.Run("valid client certificate", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties[mqttClientCert] = "-----BEGIN CERTIFICATE-----\nMIICzDCCAbQCCQDBKDMS3SHsDzANBgkqhkiG9w0BAQUFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1NTE1WhcNMjEw\nODA3MDY1NTE1WjAqMQswCQYDVQQGEwJJTjEbMBkGA1UEAwwSZGFwck1xdHRUZXN0\nQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5IDfsGI2pb4W\nt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu++M6yi9ABaBiYChpxbylqIeAn/HT\n3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsMfzO9p1oMpTnRdJCHYinE+oqVced5\nHI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPvui8WzJq9TMkEhEME+5hs3VKyKZr2\nqjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMWwzj2G+Rj8PMcBjoKeCLQL9uQh7f1\nTWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+Ur2b5+7/hoqnTzReJ3XUe1jM3l44f\nl0rOf4hu2QIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAT9yoIeX0LTsvx7/b+8V3a\nkP+j8u97QCc8n5xnMpivcMEk5cfqXX5Llv2EUJ9kBsynrJwT7ujhTJXSA/zb2UdC\nKH8PaSrgIlLwQNZMDofbz6+zPbjStkgne/ZQkTDIxY73sGpJL8LsQVO9p2KjOpdj\nSf9KuJhLzcHolh7ry3ZrkOg+QlMSvseeDRAxNhpkJrGQ6piXoUiEeKKNa0rWTMHx\nIP1Hqj+hh7jgqoQR48NL2jNng7I64HqTl6Mv2fiNfINiw+5xmXTB0QYkGU5NvPBO\naKcCRcGlU7ND89BogQPZsl/P04tAuQqpQWffzT4sEEOyWSVGda4N2Ys3GSQGBv8e\n-----END CERTIFICATE-----\n"
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -195,23 +168,17 @@ func TestParseMetadata(t *testing.T) {
|
|||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties[mqttClientKey] = "randomNonPEMBlockClientKey"
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
_, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
_, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.Contains(t, err.Error(), "invalid client certificate key")
|
||||
assert.Contains(t, err.Error(), "invalid clientKey")
|
||||
})
|
||||
|
||||
t.Run("valid client certificate key", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
fakeMetaData := pubsub.Metadata{Properties: fakeProperties}
|
||||
fakeMetaData.Properties[mqttClientKey] = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA5IDfsGI2pb4Wt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu\n++M6yi9ABaBiYChpxbylqIeAn/HT3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsM\nfzO9p1oMpTnRdJCHYinE+oqVced5HI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPv\nui8WzJq9TMkEhEME+5hs3VKyKZr2qjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMW\nwzj2G+Rj8PMcBjoKeCLQL9uQh7f1TWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+U\nr2b5+7/hoqnTzReJ3XUe1jM3l44fl0rOf4hu2QIDAQABAoIBAQCVMINb4TP20P55\n9IPyqlxjhPT563hijXK+lhMJyiBDPavOOs7qjLikq2bshYPVbm1o2jt6pkXXqAeB\n5t/d20fheQQurYyPfxecNBZuL78duwbcUy28m2aXLlcVRYO4zGhoMgdW4UajoNLV\nT/UIiDONWGyhTHXMHdP+6h9UOmvs3o4b225AuLrw9n6QO5I1Se8lcfOTIqR1fy4O\nGsUWEQPdW0X3Dhgpx7kDIuBTAQzbjD31PCR1U8h2wsCeEe6hPCrsMbo/D019weol\ndi40tbWR1/oNz0+vro2d9YDPJkXN0gmpT51Z4YJoexZBdyzO5z4DMSdn5yczzt6p\nQq8LsXAFAoGBAPYXRbC4OxhtuC+xr8KRkaCCMjtjUWFbFWf6OFgUS9b5uPz9xvdY\nXo7wBP1zp2dS8yFsdIYH5Six4Z5iOuDR4sVixzjabhwedL6bmS1zV5qcCWeASKX1\nURgSkfMmC4Tg3LBgZ9YxySFcVRjikxljkS3eK7Mp7Xmj5afe7qV73TJfAoGBAO20\nTtw2RGe02xnydZmmwf+NpQHOA9S0JsehZA6NRbtPEN/C8bPJIq4VABC5zcH+tfYf\nzndbDlGhuk+qpPA590rG5RSOUjYnQFq7njdSfFyok9dXSZQTjJwFnG2oy0LmgjCe\nROYnbCzD+a+gBKV4xlo2M80OLakQ3zOwPT0xNRnHAoGATLEj/tbrU8mdxP9TDwfe\nom7wyKFDE1wXZ7gLJyfsGqrog69y+lKH5XPXmkUYvpKTQq9SARMkz3HgJkPmpXnD\nelA2Vfl8pza2m1BShF+VxZErPR41hcLV6vKemXAZ1udc33qr4YzSaZskygSSYy8s\nZ2b9p3BBmc8CGzbWmKvpW3ECgYEAn7sFLxdMWj/+5221Nr4HKPn+wrq0ek9gq884\n1Ep8bETSOvrdvolPQ5mbBKJGsLC/h5eR/0Rx18sMzpIF6eOZ2GbU8z474mX36cCf\nrd9A8Gbbid3+9IE6gHGIz2uYwujw3UjNVbdyCpbahvjJhoQlDePUZVu8tRpAUpSA\nYklZvGsCgYBuIlOFTNGMVUnwfzrcS9a/31LSvWTZa8w2QFjsRPMYFezo2l4yWs4D\nPEpeuoJm+Gp6F6ayjoeyOw9mvMBH5hAZr4WjbiU6UodzEHREAsLAzCzcRyIpnDE6\nPW1c3j60r8AHVufkWTA+8B9WoLC5MqcYTV3beMGnNGGqS2PeBom63Q==\n-----END RSA PRIVATE KEY-----\n"
|
||||
obj := mqttPubSub{
|
||||
logger: log,
|
||||
}
|
||||
m, err := obj.parseMQTTMetaData(fakeMetaData)
|
||||
m, err := parseMQTTMetaData(fakeMetaData, log)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ require (
|
|||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
|
||||
github.com/armon/go-metrics v0.3.10 // indirect
|
||||
|
|
@ -89,6 +90,7 @@ require (
|
|||
go.opencensus.io v0.23.0 // indirect
|
||||
go.opentelemetry.io/otel v1.7.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/ratelimit v0.2.0 // indirect
|
||||
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
|
|
|
|||
|
|
@ -71,6 +71,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
|
|||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
|
||||
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
|
||||
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
|
||||
github.com/andybalholm/brotli v1.0.2 h1:JKnhI/XQ75uFBTiuzXpzFrUriDPiZjlOSzh6wXogP0E=
|
||||
github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
|
||||
|
|
@ -667,6 +669,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
|
|||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
|
||||
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
|
||||
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
|
||||
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
|
||||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
|
||||
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
|
||||
|
|
|
|||
Loading…
Reference in New Issue