diff --git a/pubsub/mqtt/mqtt.go b/pubsub/mqtt/mqtt.go index 30c4d760b..243eefc64 100644 --- a/pubsub/mqtt/mqtt.go +++ b/pubsub/mqtt/mqtt.go @@ -190,7 +190,7 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl token := m.consumer.SubscribeMultiple( m.topics, func(client mqtt.Client, mqttMsg mqtt.Message) { - handler(&pubsub.NewMessage{Topic: req.Topic, Data: mqttMsg.Payload()}) + handler(ctx, &pubsub.NewMessage{Topic: req.Topic, Data: mqttMsg.Payload()}) }) if err := token.Error(); err != nil { m.logger.Errorf("mqtt error from subscribe: %v", err) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index dfc64866a..3a51bd40f 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -11,7 +11,7 @@ import "context" type PubSub interface { Init(metadata Metadata) error Publish(req *PublishRequest) error - Subscribe(req SubscribeRequest, handler pubsub.Handler) error + Subscribe(req SubscribeRequest, handler Handler) error Close() error }