Fixed unit tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-01-26 00:34:35 +00:00
parent 00e7d1d19a
commit 7880c77d69
4 changed files with 67 additions and 46 deletions

View File

@ -1,5 +1,5 @@
/*
Copyright 2021 The Dapr Authors
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

View File

@ -1,5 +1,5 @@
/*
Copyright 2021 The Dapr Authors
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@ -290,54 +290,56 @@ func (m *MQTT) createClientOptions(uri *url.URL, clientID string) *mqtt.ClientOp
return opts
}
func (m *MQTT) handleMessage(client mqtt.Client, mqttMsg mqtt.Message) {
// We're using m.ctx as context in this method because we don't have access to the Read context
// Canceling the Read context makes Read invoke "Disconnect" anyways
ctx := m.ctx
var bo backoff.BackOff = backoff.WithContext(m.backOff, ctx)
if m.metadata.backOffMaxRetries >= 0 {
bo = backoff.WithMaxRetries(bo, uint64(m.metadata.backOffMaxRetries))
}
err := retry.NotifyRecover(
func() error {
m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
_, err := m.readHandler(ctx, &bindings.ReadResponse{
Data: mqttMsg.Payload(),
Metadata: map[string]string{
mqttTopic: mqttMsg.Topic(),
},
})
if err != nil {
return err
}
// Ack the message on success
mqttMsg.Ack()
return nil
},
bo,
func(err error, d time.Duration) {
m.logger.Errorf("Error processing MQTT message: %s/%d. Retrying…", mqttMsg.Topic(), mqttMsg.MessageID())
},
func() {
m.logger.Infof("Successfully processed MQTT message after it previously failed: %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
},
)
if err != nil {
m.logger.Errorf("Failed processing MQTT message: %s/%d: %v", mqttMsg.Topic(), mqttMsg.MessageID(), err)
}
}
// Extends createClientOptions with options for subscribers only
func (m *MQTT) createSubscriberClientOptions(uri *url.URL, clientID string) *mqtt.ClientOptions {
opts := m.createClientOptions(uri, clientID)
// On (re-)connection, add the topic subscription
opts.OnConnect = func(c mqtt.Client) {
// We're using m.ctx as context in this method because we don't have access to the Read context
// Canceling the Read context makes Read invoke "Disconnect" anyways
ctx := m.ctx
token := c.Subscribe(m.metadata.topic, m.metadata.qos, func(client mqtt.Client, mqttMsg mqtt.Message) {
var bo backoff.BackOff = backoff.WithContext(m.backOff, ctx)
if m.metadata.backOffMaxRetries >= 0 {
bo = backoff.WithMaxRetries(bo, uint64(m.metadata.backOffMaxRetries))
}
err := retry.NotifyRecover(
func() error {
m.logger.Debugf("Processing MQTT message %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
_, err := m.readHandler(ctx, &bindings.ReadResponse{
Data: mqttMsg.Payload(),
Metadata: map[string]string{
mqttTopic: mqttMsg.Topic(),
},
})
if err != nil {
return err
}
// Ack the message on success
mqttMsg.Ack()
return nil
},
bo,
func(err error, d time.Duration) {
m.logger.Errorf("Error processing MQTT message: %s/%d. Retrying…", mqttMsg.Topic(), mqttMsg.MessageID())
},
func() {
m.logger.Infof("Successfully processed MQTT message after it previously failed: %s/%d", mqttMsg.Topic(), mqttMsg.MessageID())
},
)
if err != nil {
m.logger.Errorf("Failed processing MQTT message: %s/%d: %v", mqttMsg.Topic(), mqttMsg.MessageID(), err)
}
})
token := c.Subscribe(m.metadata.topic, m.metadata.qos, m.handleMessage)
var err error
subscribeCtx, subscribeCancel := context.WithTimeout(ctx, defaultWait)
subscribeCtx, subscribeCancel := context.WithTimeout(m.ctx, defaultWait)
defer subscribeCancel()
select {
case <-token.Done():

View File

@ -1,3 +1,18 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mqtt
import (

View File

@ -1,5 +1,5 @@
/*
Copyright 2021 The Dapr Authors
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@ -19,7 +19,9 @@ import (
"encoding/pem"
"errors"
"testing"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/bindings"
@ -202,16 +204,18 @@ func TestParseMetadata(t *testing.T) {
logger := logger.NewLogger("test")
m := NewMQTT(logger).(*MQTT)
m.backOff = backoff.NewConstantBackOff(5 * time.Second)
m.ctx, m.cancel = context.WithCancel(context.Background())
m.handleMessage(context.Background(), func(ctx context.Context, r *bindings.ReadResponse) ([]byte, error) {
m.readHandler = func(ctx context.Context, r *bindings.ReadResponse) ([]byte, error) {
assert.Equal(t, payload, r.Data)
metadata := r.Metadata
responseTopic, ok := metadata[mqttTopic]
assert.True(t, ok)
assert.Equal(t, topic, responseTopic)
return r.Data, nil
}, &mqttMockMessage{
}
m.handleMessage(nil, &mqttMockMessage{
topic: topic,
payload: payload,
})