mqtt protocol binding (#910)

* add receiver and opener for mqtt broker

Signed-off-by: myan <myan@redhat.com>

add protocol and sample files

Signed-off-by: myan <myan@redhat.com>

rollback samples to go1.17

Signed-off-by: myan <myan@redhat.com>

move the protocol to go1.17

Signed-off-by: myan <myan@redhat.com>

add message test

Signed-off-by: myan <myan@redhat.com>

trigger to run the integration test

Signed-off-by: myan <myan@redhat.com>

fixed go mod issue

Signed-off-by: myan <myan@redhat.com>

resolve the review issue

Signed-off-by: myan <myan@redhat.com>

remove the useless comment

Signed-off-by: myan <myan@redhat.com>

add ut for writeMessage

Signed-off-by: myan <myan@redhat.com>

go mod tidy

Signed-off-by: myan <myan@redhat.com>

add intergration test

Signed-off-by: myan <myan@redhat.com>

solve the uncheck error

Signed-off-by: myan <myan@redhat.com>

fix the integration error

Signed-off-by: myan <myan@redhat.com>

fix the integration error

Signed-off-by: myan <myan@redhat.com>

reply the review

Signed-off-by: myan <myan@redhat.com>

simpler tests

Signed-off-by: myan <myan@redhat.com>

remove the nesting

Signed-off-by: myan <myan@redhat.com>

refactor the recevier logic

Signed-off-by: myan <myan@redhat.com>

add a timer for assert loop

Signed-off-by: myan <myan@redhat.com>

* add copyright

Signed-off-by: myan <myan@redhat.com>

---------

Signed-off-by: myan <myan@redhat.com>
This commit is contained in:
Meng Yan 2023-07-13 21:08:08 +08:00 committed by GitHub
parent f681ac6b51
commit fdcb2d226a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1093 additions and 0 deletions

View File

@ -54,6 +54,11 @@ jobs:
}
ports:
- 5672:5672
mqtt:
image: eclipse-mosquitto:1.6
ports:
- 1883:1883
steps:
- name: Checkout code

1
.gitignore vendored
View File

@ -35,3 +35,4 @@ test/benchmark/e2e/http/results/
tmp/
test/coverage.tmp
vendor/

View File

@ -0,0 +1,25 @@
module github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2
go 1.17
replace github.com/cloudevents/sdk-go/v2 => ../../../v2
require (
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/eclipse/paho.golang v0.11.0
github.com/stretchr/testify v1.8.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@ -0,0 +1,55 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.golang v0.11.0 h1:6Avu5dkkCfcB61/y1vx+XrPQ0oAl4TPYtY0uw3HbQdM=
github.com/eclipse/paho.golang v0.11.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,119 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho
import (
"bytes"
"context"
"strings"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/eclipse/paho.golang/paho"
)
const (
prefix = "ce-"
contentType = "Content-Type"
)
var specs = spec.WithPrefix(prefix)
// Message represents a MQTT message.
// This message *can* be read several times safely
type Message struct {
internal *paho.Publish
version spec.Version
format format.Format
}
// Check if Message implements binding.Message
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)
func NewMessage(msg *paho.Publish) *Message {
var f format.Format
var v spec.Version
if msg.Properties != nil {
// Use properties.User["Content-type"] to determine if message is structured
if s := msg.Properties.User.Get(contentType); format.IsFormat(s) {
f = format.Lookup(s)
} else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" {
v = specs.Version(s)
}
}
return &Message{
internal: msg,
version: v,
format: f,
}
}
func (m *Message) ReadEncoding() binding.Encoding {
if m.version != nil {
return binding.EncodingBinary
}
if m.format != nil {
return binding.EncodingStructured
}
return binding.EncodingUnknown
}
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.version != nil {
return binding.ErrNotStructured
}
if m.format == nil {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Payload))
}
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error) {
if m.format != nil {
return binding.ErrNotBinary
}
for _, userProperty := range m.internal.Properties.User {
if strings.HasPrefix(userProperty.Key, prefix) {
attr := m.version.Attribute(userProperty.Key)
if attr != nil {
err = encoder.SetAttribute(attr, userProperty.Value)
} else {
err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value)
}
} else if userProperty.Key == contentType {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value))
}
if err != nil {
return
}
}
if m.internal.Payload != nil {
return encoder.SetData(bytes.NewBuffer(m.internal.Payload))
}
return nil
}
func (m *Message) Finish(error) error {
return nil
}
func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
attr := m.version.AttributeFromKind(k)
if attr != nil {
return attr, m.internal.Properties.User.Get(prefix + attr.Name())
}
return nil, nil
}
func (m *Message) GetExtension(name string) interface{} {
return m.internal.Properties.User.Get(prefix + name)
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho
import (
"context"
"testing"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/eclipse/paho.golang/paho"
)
func TestReadStructured(t *testing.T) {
tests := []struct {
name string
msg *paho.Publish
wantErr error
}{
{
name: "nil format",
msg: &paho.Publish{
Payload: []byte(""),
},
wantErr: binding.ErrNotStructured,
},
{
name: "json format",
msg: &paho.Publish{
Payload: []byte(""),
Properties: &paho.PublishProperties{
User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
msg := NewMessage(tc.msg)
err := msg.ReadStructured(context.Background(), (*pubMessageWriter)(tc.msg))
if err != tc.wantErr {
t.Errorf("Error unexpected. got: %v, want: %v", err, tc.wantErr)
}
})
}
}
func TestReadBinary(t *testing.T) {
msg := &paho.Publish{
Payload: []byte("{hello:world}"),
Properties: &paho.PublishProperties{
User: []paho.UserProperty{
{Key: "ce-specversion", Value: "1.0"},
{Key: "ce-type", Value: "binary.test"},
{Key: "ce-source", Value: "test-source"},
{Key: "ce-id", Value: "ABC-123"},
},
},
}
message := NewMessage(msg)
err := message.ReadBinary(context.Background(), (*pubMessageWriter)(msg))
if err != nil {
t.Errorf("Error unexpected. got: %v", err)
}
}

View File

@ -0,0 +1,147 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho
import (
"context"
"fmt"
"io"
"sync"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/eclipse/paho.golang/paho"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)
type Protocol struct {
client *paho.Client
connConfig *paho.Connect
senderTopic string
receiverTopics []string
qos byte
retained bool
// receiver
incoming chan *paho.Publish
// inOpen
openerMutex sync.Mutex
closeChan chan struct{}
}
var (
_ protocol.Sender = (*Protocol)(nil)
_ protocol.Opener = (*Protocol)(nil)
_ protocol.Receiver = (*Protocol)(nil)
_ protocol.Closer = (*Protocol)(nil)
)
func New(ctx context.Context, clientConfig *paho.ClientConfig, connConfig *paho.Connect, SenderTopic string,
ReceiverTopics []string, qos byte, retained bool,
) (*Protocol, error) {
client := paho.NewClient(*clientConfig)
ca, err := client.Connect(ctx, connConfig)
if err != nil {
return nil, err
}
if ca.ReasonCode != 0 {
return nil, fmt.Errorf("failed to connect to %s : %d - %s", client.Conn.RemoteAddr(), ca.ReasonCode,
ca.Properties.ReasonString)
}
return &Protocol{
client: client,
connConfig: connConfig,
senderTopic: SenderTopic,
receiverTopics: ReceiverTopics,
qos: qos,
retained: retained,
incoming: make(chan *paho.Publish),
openerMutex: sync.Mutex{},
closeChan: make(chan struct{}),
}, nil
}
func (t *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error {
var err error
defer m.Finish(err)
topic := cecontext.TopicFrom(ctx)
if topic == "" {
topic = t.senderTopic
}
msg := &paho.Publish{
QoS: t.qos,
Retain: t.retained,
Topic: topic,
}
err = WritePubMessage(ctx, m, msg, transformers...)
if err != nil {
return err
}
_, err = t.client.Publish(ctx, msg)
if err != nil {
return err
}
return err
}
func (t *Protocol) OpenInbound(ctx context.Context) error {
t.openerMutex.Lock()
defer t.openerMutex.Unlock()
logger := cecontext.LoggerFrom(ctx)
t.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) {
t.incoming <- m
})
subs := make(map[string]paho.SubscribeOptions)
for _, topic := range t.receiverTopics {
subs[topic] = paho.SubscribeOptions{
QoS: t.qos,
RetainAsPublished: t.retained,
}
}
logger.Infof("subscribe to topics: %v", t.receiverTopics)
_, err := t.client.Subscribe(ctx, &paho.Subscribe{
Subscriptions: subs,
})
if err != nil {
return err
}
// Wait until external or internal context done
select {
case <-ctx.Done():
case <-t.closeChan:
}
return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
}
// Receive implements Receiver.Receive
func (t *Protocol) Receive(ctx context.Context) (binding.Message, error) {
select {
case m, ok := <-t.incoming:
if !ok {
return nil, io.EOF
}
msg := NewMessage(m)
return msg, nil
case <-ctx.Done():
return nil, io.EOF
}
}
func (p *Protocol) Close(ctx context.Context) error {
close(p.closeChan)
return nil
}

View File

@ -0,0 +1,133 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho
import (
"bytes"
"context"
"io"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/cloudevents/sdk-go/v2/types"
"github.com/eclipse/paho.golang/paho"
)
// WritePubMessage fills the provided pubMessage with the message m.
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
func WritePubMessage(ctx context.Context, m binding.Message, pubMessage *paho.Publish, transformers ...binding.Transformer) error {
structuredWriter := (*pubMessageWriter)(pubMessage)
binaryWriter := (*pubMessageWriter)(pubMessage)
_, err := binding.Write(
ctx,
m,
structuredWriter,
binaryWriter,
transformers...,
)
return err
}
type pubMessageWriter paho.Publish
var (
_ binding.StructuredWriter = (*pubMessageWriter)(nil)
_ binding.BinaryWriter = (*pubMessageWriter)(nil)
)
func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error {
if b.Properties == nil {
b.Properties = &paho.PublishProperties{
User: make([]paho.UserProperty, 0),
}
}
b.Properties.User.Add(contentType, f.MediaType())
var buf bytes.Buffer
_, err := io.Copy(&buf, event)
if err != nil {
return err
}
b.Payload = buf.Bytes()
return nil
}
func (b *pubMessageWriter) Start(ctx context.Context) error {
if b.Properties == nil {
b.Properties = &paho.PublishProperties{
User: make([]paho.UserProperty, 0),
}
}
return nil
}
func (b *pubMessageWriter) End(ctx context.Context) error {
return nil
}
func (b *pubMessageWriter) SetData(reader io.Reader) error {
buf, ok := reader.(*bytes.Buffer)
if !ok {
buf = new(bytes.Buffer)
_, err := io.Copy(buf, reader)
if err != nil {
return err
}
}
b.Payload = buf.Bytes()
return nil
}
func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
if attribute.Kind() == spec.DataContentType {
if value == nil {
b.removeProperty(contentType)
}
s, err := types.Format(value)
if err != nil {
return err
}
if err := b.addProperty(contentType, s); err != nil {
return err
}
} else {
if value == nil {
b.removeProperty(prefix + attribute.Name())
}
return b.addProperty(prefix+attribute.Name(), value)
}
return nil
}
func (b *pubMessageWriter) SetExtension(name string, value interface{}) error {
if value == nil {
b.removeProperty(prefix + name)
}
return b.addProperty(prefix+name, value)
}
func (b *pubMessageWriter) removeProperty(key string) {
for i, v := range b.Properties.User {
if v.Key == key {
b.Properties.User = append(b.Properties.User[:i], b.Properties.User[i+1:]...)
break
}
}
}
func (b *pubMessageWriter) addProperty(key string, value interface{}) error {
s, err := types.Format(value)
if err != nil {
return err
}
b.Properties.User = append(b.Properties.User, paho.UserProperty{
Key: key,
Value: s,
})
return nil
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho
import (
"context"
"testing"
"github.com/cloudevents/sdk-go/v2/binding"
. "github.com/cloudevents/sdk-go/v2/binding/test"
"github.com/cloudevents/sdk-go/v2/event"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/eclipse/paho.golang/paho"
"github.com/stretchr/testify/require"
)
func TestEncodeMQTTPubMessage(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
messageFactory func(e event.Event) binding.Message
expectedEncoding binding.Encoding
}{
{
name: "Structured to Structured",
messageFactory: func(e event.Event) binding.Message {
return MustCreateMockStructuredMessage(t, e)
},
expectedEncoding: binding.EncodingStructured,
},
{
name: "Binary to Binary",
messageFactory: MustCreateMockBinaryMessage,
expectedEncoding: binding.EncodingBinary,
},
}
EachEvent(t, Events(), func(t *testing.T, e event.Event) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
eventIn := ConvertEventExtensionsToString(t, e.Clone())
// convert the event to binding.Message with specific encoding
messageIn := tc.messageFactory(eventIn)
// load the binding.Message into a pubMessage
pubMessage := &paho.Publish{}
err := WritePubMessage(ctx, messageIn, pubMessage)
require.NoError(t, err)
// convert the pubMessage back to binding.Message
messageOut := NewMessage(pubMessage)
require.Equal(t, tc.expectedEncoding, messageOut.ReadEncoding())
// convert the binding.Message back to event.Event
eventOut, err := binding.ToEvent(ctx, messageOut)
require.NoError(t, err)
// check if the event is the same
AssertEventEquals(t, eventIn, *eventOut)
})
}
})
}

9
samples/mqtt/README.md Normal file
View File

@ -0,0 +1,9 @@
MQTT samples
To run the samples, you need a running MQTT broker.
To run a sample MQTT broker using docker:
```bash
docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
```

26
samples/mqtt/go.mod Normal file
View File

@ -0,0 +1,26 @@
module github.com/cloudevents/sdk-go/samples/mqtt
go 1.17
require (
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/eclipse/paho.golang v0.11.0
github.com/google/uuid v1.3.0
)
require (
github.com/google/go-cmp v0.5.8 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/stretchr/testify v1.8.1 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
)
replace github.com/cloudevents/sdk-go/v2 => ../../v2
replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2

62
samples/mqtt/go.sum Normal file
View File

@ -0,0 +1,62 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.golang v0.11.0 h1:6Avu5dkkCfcB61/y1vx+XrPQ0oAl4TPYtY0uw3HbQdM=
github.com/eclipse/paho.golang v0.11.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,56 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package main
import (
"context"
"fmt"
"log"
"net"
cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/eclipse/paho.golang/paho"
)
func main() {
ctx := context.Background()
conn, err := net.Dial("tcp", "127.0.0.1:1883")
if err != nil {
log.Fatalf("failed to connect to mqtt broker: %s", err.Error())
}
clientConfig := &paho.ClientConfig{
ClientID: "receiver-client-id",
Conn: conn,
}
cp := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
}
p, err := cemqtt.New(ctx, clientConfig, cp, "", []string{"test-topic"}, 0, false)
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer p.Close(ctx)
c, err := cloudevents.NewClient(p)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
log.Printf("receiver start consuming messages from test-topic\n")
err = c.StartReceiver(ctx, receive)
if err != nil {
log.Fatalf("failed to start receiver: %s", err)
} else {
log.Printf("receiver stopped\n")
}
}
func receive(ctx context.Context, event cloudevents.Event) {
fmt.Printf("%s", event)
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package main
import (
"context"
"log"
"net"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)
const (
count = 10
)
func main() {
ctx := context.Background()
conn, err := net.Dial("tcp", "127.0.0.1:1883")
if err != nil {
log.Fatalf("failed to connect to mqtt broker: %s", err.Error())
}
clientConfig := &paho.ClientConfig{
ClientID: "sender-client-id",
Conn: conn,
}
cp := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
}
// set a default topic with test-topic1
p, err := cemqtt.New(ctx, clientConfig, cp, "test-topic1", nil, 0, false)
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer p.Close(ctx)
c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
for i := 0; i < count; i++ {
e := cloudevents.NewEvent()
e.SetID(uuid.New().String())
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/samples/mqtt/sender")
err = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})
if err != nil {
log.Printf("failed to set data: %v", err)
}
if result := c.Send(
cecontext.WithTopic(ctx, "test-topic"),
e,
); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", result)
} else {
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
time.Sleep(1 * time.Second)
}
}

View File

@ -14,14 +14,18 @@ replace github.com/cloudevents/sdk-go/protocol/nats/v2 => ../../protocol/nats/v2
replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol/kafka_sarama/v2
replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2
require (
github.com/Azure/go-amqp v0.17.0
github.com/Shopify/sarama v1.38.1
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.5.0
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.5.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0
github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/eclipse/paho.golang v0.11.0
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.1.1
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d

View File

@ -73,6 +73,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.golang v0.11.0 h1:6Avu5dkkCfcB61/y1vx+XrPQ0oAl4TPYtY0uw3HbQdM=
github.com/eclipse/paho.golang v0.11.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@ -159,6 +161,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
@ -313,6 +316,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
@ -445,6 +449,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=

View File

@ -0,0 +1,99 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho
import (
"context"
"net"
"testing"
"time"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/test"
)
type receiveEvent struct {
event cloudevents.Event
err error
}
func TestSendEvent(t *testing.T) {
test.EachEvent(t, test.Events(), func(t *testing.T, inEvent event.Event) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
topicName := "test-ce-client-" + uuid.New().String()
inEvent = test.ConvertEventExtensionsToString(t, inEvent)
// start a cloudevents receiver client go to receive the event
eventChan := make(chan receiveEvent)
defer close(eventChan)
go func() {
client, err := cloudevents.NewClient(protocolFactory(t, topicName))
if err != nil {
eventChan <- receiveEvent{err: err}
return
}
err = client.StartReceiver(ctx, func(event cloudevents.Event) {
eventChan <- receiveEvent{event: event}
})
if err != nil {
eventChan <- receiveEvent{err: err}
return
}
}()
// start a cloudevents sender client go to send the event
client, err := cloudevents.NewClient(protocolFactory(t, topicName))
require.NoError(t, err)
timer := time.NewTimer(5 * time.Millisecond)
defer timer.Stop()
for {
select {
case <-ctx.Done():
require.Fail(t, "timeout waiting for event")
return
case eventOut := <-eventChan:
require.NoError(t, eventOut.err)
test.AssertEventEquals(t, inEvent, test.ConvertEventExtensionsToString(t, eventOut.event))
return
case <-timer.C:
result := client.Send(ctx, inEvent)
require.NoError(t, result)
// the receiver mightn't be ready before the sender send the message, so wait and we retry
continue
}
}
})
}
// To start a local environment for testing:
// docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
// the protocolFactory will generate a unique connection clientId when it be invoked
func protocolFactory(t testing.TB, topicName string) *mqtt_paho.Protocol {
ctx := context.Background()
broker := "127.0.0.1:1883"
conn, err := net.Dial("tcp", broker)
require.NoError(t, err)
clientConfig := &paho.ClientConfig{
Conn: conn,
}
cp := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
}
p, err := mqtt_paho.New(ctx, clientConfig, cp, topicName, []string{topicName}, 0, false)
require.NoError(t, err)
return p
}

View File

@ -0,0 +1,138 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package mqtt_paho_binding
import (
"context"
"net"
"testing"
"time"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/stretchr/testify/assert"
. "github.com/cloudevents/sdk-go/v2/binding/test"
. "github.com/cloudevents/sdk-go/v2/test"
)
type receiveMessage struct {
msg binding.Message
err error
}
func TestEncodingBinding(t *testing.T) {
testCases := []struct {
name string
inEncoding binding.Encoding
inEventEncoder func(tt *testing.T, inEvent event.Event) binding.Message
expectedEncoding binding.Encoding
}{
{
name: "Structured message with Structured encoding",
inEncoding: binding.EncodingStructured,
inEventEncoder: func(tt *testing.T, inEvent event.Event) binding.Message {
return MustCreateMockStructuredMessage(tt, inEvent)
},
expectedEncoding: binding.EncodingStructured,
},
{
name: "Binary message with Binary encoding",
inEncoding: binding.EncodingBinary,
inEventEncoder: func(tt *testing.T, inEvent event.Event) binding.Message {
return MustCreateMockBinaryMessage(inEvent)
},
expectedEncoding: binding.EncodingBinary,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
EachEvent(t, Events(), func(t *testing.T, inEvent event.Event) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
topicName := "test-ce-client-" + uuid.New().String()
// start a cloudevents receiver client go to receive the event
receiveChan := make(chan receiveMessage)
defer close(receiveChan)
startReceiver(ctx, topicName, receiveChan)
// start a cloudevents sender client go to send the event
sender, err := getProtocol(ctx, topicName)
require.NoError(t, err)
defer sender.Close(ctx)
inEvent = ConvertEventExtensionsToString(t, inEvent)
inMessage := tt.inEventEncoder(t, inEvent)
ctx = binding.WithPreferredEventEncoding(ctx, tt.inEncoding)
timer := time.NewTimer(5 * time.Millisecond)
defer timer.Stop()
for {
select {
case <-ctx.Done():
require.Fail(t, "timeout waiting for event")
return
case receiveMsg := <-receiveChan:
require.NoError(t, receiveMsg.err)
outMessage := receiveMsg.msg
assert.Equal(t, tt.expectedEncoding, outMessage.ReadEncoding())
outEvent := MustToEvent(t, ctx, outMessage)
AssertEventEquals(t, inEvent, ConvertEventExtensionsToString(t, outEvent))
return
case <-timer.C:
result := sender.Send(ctx, inMessage)
require.NoError(t, result)
// the receiver mightn't be ready before the sender send the message, so we retry
continue
}
}
})
})
}
}
func startReceiver(ctx context.Context, topicName string, messageChan chan receiveMessage) {
receiver, err := getProtocol(ctx, topicName)
if err != nil {
messageChan <- receiveMessage{err: err}
}
go func() {
err := receiver.OpenInbound(ctx)
if err != nil {
messageChan <- receiveMessage{err: err}
}
receiver.Close(ctx)
}()
go func() {
msg, result := receiver.Receive(ctx)
messageChan <- receiveMessage{msg, result}
}()
}
func getProtocol(ctx context.Context, topic string) (*mqtt_paho.Protocol, error) {
broker := "127.0.0.1:1883"
conn, err := net.Dial("tcp", broker)
if err != nil {
return nil, err
}
cp := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
}
p, err := mqtt_paho.New(ctx, &paho.ClientConfig{
Conn: conn,
}, cp, topic, []string{topic}, 0, false)
return p, err
}