Better handling of concurrency in MQTT (#1748)
This helps avoiding issues when multiple subscribers are added in a short period of time Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
d55ce1dfdb
commit
3a2a27a253
|
|
@ -202,9 +202,9 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl
|
|||
defer m.topicsLock.Unlock()
|
||||
|
||||
// reset synchronization
|
||||
if m.consumer != nil {
|
||||
if m.consumer != nil && m.consumer.IsConnectionOpen() {
|
||||
m.logger.Infof("re-initializing the subscriber to add topic %s", req.Topic)
|
||||
m.consumer.Disconnect(0)
|
||||
m.consumer.Disconnect(5)
|
||||
m.consumer = nil
|
||||
} else {
|
||||
m.logger.Infof("initializing the subscriber with topic %s", req.Topic)
|
||||
|
|
@ -227,53 +227,59 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl
|
|||
subscribeTopics[k] = m.metadata.qos
|
||||
}
|
||||
|
||||
go func() {
|
||||
token := m.consumer.SubscribeMultiple(
|
||||
subscribeTopics,
|
||||
func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
mqttMsg.AutoAckOff()
|
||||
msg := pubsub.NewMessage{
|
||||
Topic: mqttMsg.Topic(),
|
||||
Data: mqttMsg.Payload(),
|
||||
token := m.consumer.SubscribeMultiple(
|
||||
subscribeTopics,
|
||||
func(client mqtt.Client, mqttMsg mqtt.Message) {
|
||||
mqttMsg.AutoAckOff()
|
||||
msg := pubsub.NewMessage{
|
||||
Topic: mqttMsg.Topic(),
|
||||
Data: mqttMsg.Payload(),
|
||||
}
|
||||
|
||||
m.topicsLock.RLock()
|
||||
topicHandler, ok := m.topics[msg.Topic]
|
||||
m.topicsLock.RUnlock()
|
||||
if !ok || topicHandler == nil {
|
||||
m.logger.Errorf("no handler defined for topic %s", msg.Topic)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Make the backoff configurable for constant or exponential
|
||||
var b backoff.BackOff = backoff.NewConstantBackOff(5 * time.Second)
|
||||
b = backoff.WithContext(b, m.ctx)
|
||||
if m.metadata.backOffMaxRetries >= 0 {
|
||||
b = backoff.WithMaxRetries(b, uint64(m.metadata.backOffMaxRetries))
|
||||
}
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
|
||||
if err := topicHandler(m.ctx, &msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.topicsLock.RLock()
|
||||
topicHandler, ok := m.topics[msg.Topic]
|
||||
m.topicsLock.RUnlock()
|
||||
if !ok || topicHandler == nil {
|
||||
m.logger.Errorf("no handler defined for topic %s", msg.Topic)
|
||||
return
|
||||
}
|
||||
mqttMsg.Ack()
|
||||
|
||||
// TODO: Make the backoff configurable for constant or exponential
|
||||
var b backoff.BackOff = backoff.NewConstantBackOff(5 * time.Second)
|
||||
b = backoff.WithContext(b, m.ctx)
|
||||
if m.metadata.backOffMaxRetries >= 0 {
|
||||
b = backoff.WithMaxRetries(b, uint64(m.metadata.backOffMaxRetries))
|
||||
}
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
|
||||
if err := topicHandler(m.ctx, &msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mqttMsg.Ack()
|
||||
|
||||
return nil
|
||||
}, b, func(err error, d time.Duration) {
|
||||
m.logger.Errorf("Error processing MQTT message: %s/%d. Retrying...", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
}, func() {
|
||||
m.logger.Infof("Successfully processed MQTT message after it previously failed: %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
}); err != nil {
|
||||
m.logger.Errorf("Failed processing MQTT message: %s/%d: %v", mqttMsg.Topic(), mqttMsg.MessageID(), err)
|
||||
}
|
||||
},
|
||||
)
|
||||
if err := token.Error(); err != nil {
|
||||
m.logger.Errorf("mqtt error from subscribe: %v", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}, b, func(err error, d time.Duration) {
|
||||
m.logger.Errorf("Error processing MQTT message: %s/%d. Retrying...", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
}, func() {
|
||||
m.logger.Infof("Successfully processed MQTT message after it previously failed: %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
|
||||
}); err != nil {
|
||||
m.logger.Errorf("Failed processing MQTT message: %s/%d: %v", mqttMsg.Topic(), mqttMsg.MessageID(), err)
|
||||
}
|
||||
},
|
||||
)
|
||||
subscribeCtx, subscribeCancel := context.WithTimeout(m.ctx, defaultWait)
|
||||
defer subscribeCancel()
|
||||
select {
|
||||
case <-token.Done():
|
||||
// Subscription went through
|
||||
case <-subscribeCtx.Done():
|
||||
return subscribeCtx.Err()
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return fmt.Errorf("mqtt error from subscribe: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -348,9 +354,9 @@ func (m *mqttPubSub) Close() error {
|
|||
m.cancel()
|
||||
|
||||
if m.consumer != nil {
|
||||
m.consumer.Disconnect(0)
|
||||
m.consumer.Disconnect(5)
|
||||
}
|
||||
m.producer.Disconnect(0)
|
||||
m.producer.Disconnect(5)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue