Improve mqtt pubsub for multiple topics (#443)
- Listen on multiple topics when those are provided for subscription. - Improve the current logic to use dedicated function from paho client.
This commit is contained in:
parent
5fdcbe9fe5
commit
7f9850a04f
|
|
@ -6,6 +6,7 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
|
|
@ -44,9 +45,12 @@ const (
|
|||
|
||||
// mqttPubSub type allows sending and receiving data to/from MQTT broker.
|
||||
type mqttPubSub struct {
|
||||
client mqtt.Client
|
||||
producer mqtt.Client
|
||||
consumer mqtt.Client
|
||||
metadata *metadata
|
||||
logger logger.Logger
|
||||
topics map[string]byte
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewMQTTPubSub returns a new mqttPubSub instance.
|
||||
|
|
@ -133,16 +137,15 @@ func (m *mqttPubSub) Init(metadata pubsub.Metadata) error {
|
|||
}
|
||||
m.metadata = mqttMeta
|
||||
|
||||
uri, err := url.Parse(m.metadata.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := m.connect(uri)
|
||||
// mqtt broker allows only one connection at a given time from a clientID.
|
||||
producerClientID := fmt.Sprintf("%s-producer", m.metadata.clientID)
|
||||
p, err := m.connect(producerClientID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.client = client
|
||||
m.producer = p
|
||||
m.topics = make(map[string]byte)
|
||||
|
||||
m.logger.Debug("mqtt message bus initialization complete")
|
||||
return nil
|
||||
|
|
@ -152,7 +155,7 @@ func (m *mqttPubSub) Init(metadata pubsub.Metadata) error {
|
|||
func (m *mqttPubSub) Publish(req *pubsub.PublishRequest) error {
|
||||
m.logger.Debugf("mqtt publishing topic %s with data: %v", req.Topic, req.Data)
|
||||
|
||||
token := m.client.Publish(req.Topic, m.metadata.qos, m.metadata.retain, req.Data)
|
||||
token := m.producer.Publish(req.Topic, m.metadata.qos, m.metadata.retain, req.Data)
|
||||
if !token.WaitTimeout(defaultWait) || token.Error() != nil {
|
||||
return fmt.Errorf("mqtt error from publish: %v", token.Error())
|
||||
}
|
||||
|
|
@ -161,20 +164,50 @@ func (m *mqttPubSub) Publish(req *pubsub.PublishRequest) error {
|
|||
|
||||
// Subscribe to the mqtt pub sub topic.
|
||||
func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
|
||||
token := m.client.Subscribe(
|
||||
req.Topic,
|
||||
m.metadata.qos,
|
||||
func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
handler(&pubsub.NewMessage{Topic: req.Topic, Data: mqttMsg.Payload()})
|
||||
})
|
||||
if err := token.Error(); err != nil {
|
||||
return fmt.Errorf("mqtt error from subscribe: %v", err)
|
||||
m.topics[req.Topic] = m.metadata.qos
|
||||
|
||||
// reset synchronization
|
||||
if m.consumer != nil {
|
||||
m.logger.Warnf("re-initializing the subscriber")
|
||||
m.cancel()
|
||||
m.consumer.Disconnect(0)
|
||||
}
|
||||
|
||||
// mqtt broker allows only one connection at a given time from a clientID.
|
||||
consumerClientID := fmt.Sprintf("%s-consumer", m.metadata.clientID)
|
||||
c, err := m.connect(consumerClientID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.consumer = c
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
m.cancel = cancel
|
||||
|
||||
go func() {
|
||||
token := m.consumer.SubscribeMultiple(
|
||||
m.topics,
|
||||
func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
handler(&pubsub.NewMessage{Topic: req.Topic, Data: mqttMsg.Payload()})
|
||||
})
|
||||
if err := token.Error(); err != nil {
|
||||
m.logger.Errorf("mqtt error from subscribe: %v", err)
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mqttPubSub) connect(uri *url.URL) (mqtt.Client, error) {
|
||||
opts := m.createClientOptions(uri)
|
||||
func (m *mqttPubSub) connect(clientID string) (mqtt.Client, error) {
|
||||
uri, err := url.Parse(m.metadata.url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts := m.createClientOptions(uri, clientID)
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
for !token.WaitTimeout(defaultWait) {
|
||||
|
|
@ -185,7 +218,7 @@ func (m *mqttPubSub) connect(uri *url.URL) (mqtt.Client, error) {
|
|||
return client, nil
|
||||
}
|
||||
|
||||
func (m *mqttPubSub) NewTLSConfig() *tls.Config {
|
||||
func (m *mqttPubSub) newTLSConfig() *tls.Config {
|
||||
tlsConfig := new(tls.Config)
|
||||
|
||||
if m.metadata.clientCert != "" && m.metadata.clientKey != "" {
|
||||
|
|
@ -207,15 +240,15 @@ func (m *mqttPubSub) NewTLSConfig() *tls.Config {
|
|||
return tlsConfig
|
||||
}
|
||||
|
||||
func (m *mqttPubSub) createClientOptions(uri *url.URL) *mqtt.ClientOptions {
|
||||
func (m *mqttPubSub) createClientOptions(uri *url.URL, clientID string) *mqtt.ClientOptions {
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.SetClientID(m.metadata.clientID)
|
||||
opts.SetClientID(clientID)
|
||||
opts.SetCleanSession(m.metadata.cleanSession)
|
||||
opts.AddBroker(uri.Scheme + "://" + uri.Host)
|
||||
opts.SetUsername(uri.User.Username())
|
||||
password, _ := uri.User.Password()
|
||||
opts.SetPassword(password)
|
||||
// tls config
|
||||
opts.SetTLSConfig(m.NewTLSConfig())
|
||||
opts.SetTLSConfig(m.newTLSConfig())
|
||||
return opts
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue