This commit is contained in:
Jem Day 2025-04-14 11:38:32 +00:00 committed by GitHub
commit 3ad573f167
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 2158 additions and 6 deletions

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-mqtt-core</artifactId>
<name>CloudEvents - MQTT Common</name>
<packaging>jar</packaging>
<properties>
<module-name>io.cloudevents.mqtt.core</module-name>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,106 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.core;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Enable the hydration of a CloudEvent in binary mode from an MQTT message.
* <p>
* This abstract class provides common behavior across different MQTT
* client implementations.
*/
public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
/**
* CloudEvent attribute names must match this pattern.
*/
private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$");
private final String contentType;
/**
* Initialise the binary message reader.
*
* @param version The CloudEvent message version.
* @param contentType The assigned media content type.
* @param payload The raw data payload from the MQTT message.
*/
protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) {
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
this.contentType = contentType;
}
// --- Overrides
@Override
protected boolean isContentTypeHeader(String key) {
return false; // The content type is not defined in a user-property
}
@Override
protected boolean isCloudEventsHeader(String key) {
// The binding specification does not require name prefixing,
// as such any user-property is a potential CE Context Attribute.
//
// If the name complies with CE convention then we'll assume
// it's a context attribute.
//
Matcher m = CE_ATTR_NAME_REGEX.matcher(key);
return m.matches();
}
@Override
protected String toCloudEventsKey(String key) {
return key; // No special prefixing occurs in the MQTT binding spec.
}
@Override
protected void forEachHeader(BiConsumer<String, Object> fn) {
// If there is a content-type then we need set it.
// Inspired by AMQP/Proton code :-)
if (contentType != null) {
fn.accept(CloudEventV1.DATACONTENTTYPE, contentType);
}
// Now process each MQTT User Property.
forEachUserProperty(fn);
}
@Override
protected String toCloudEventsValue(Object value) {
return value.toString();
}
/**
* Visit each MQTT user-property and invoke the supplied function.
*
* @param fn The function to invoke for each MQTT User property.
*/
protected abstract void forEachUserProperty(BiConsumer<String, Object> fn);
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.core;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
/**
* General MQTT Utilities and Helpers
*/
public class MqttUtils {
private static final String DEFAULT_FORMAT = "application/cloudevents+json";
private MqttUtils() {
}
/**
* Obtain the {@link EventFormat} to use when working with MQTT V3
* messages.
*
* @return An event format.
*/
public static EventFormat getDefaultEventFormat() {
return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT);
}
/**
* Get the default content type to assume for MQTT messages.
*
* @return A Content-Type
*/
public static final String getDefaultContentType() {
return DEFAULT_FORMAT;
}
}

View File

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-mqtt-hivemq</artifactId>
<name>CloudEvents - MQTT HiveMQ Binding</name>
<packaging>jar</packaging>
<properties>
<module-name>io.cloudevents.mqtt.hivemq</module-name>
<hivemq.version>1.3.0</hivemq.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-mqtt-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>${hivemq.version}</version>
<scope>provided</scope>
</dependency>
<!-- Testing Dependencies -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- We need a JSON Format for V3 compliance checking -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,49 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.cloudevents.SpecVersion;
import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader;
import java.util.function.BiConsumer;
final class BinaryMessageReader extends BaseMqttBinaryMessageReader {
Mqtt5Publish message;
BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) {
super(version, contentType, message.getPayloadAsBytes());
this.message = message;
}
@Override
protected void forEachUserProperty(BiConsumer<String, Object> fn) {
message.getUserProperties().asList().forEach(up -> {
final String key = up.getName().toString();
final String val = up.getValue().toString();
if (key != null && val != null) {
fn.accept(key, val);
}
});
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.datatypes.MqttUtf8String;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.mqtt.core.MqttUtils;
import java.util.List;
import java.util.Optional;
/**
* A factory to obtain:
* - {@link MessageReader} instances to read CloudEvents from MQTT messages.
* - {@link MessageWriter} instances to write CloudEvents into MQTT messages.
*/
public class MqttMessageFactory {
// Prevent Instantiation.
private MqttMessageFactory() {
}
/**
* Create a {@link MessageReader} for an MQTT V3 message.
* <p>
* As-Per MQTT Binding specification this only supports
* a structured JSON Format message.
*
* @param message An MQTT V3 message.
* @return MessageReader.
*/
public static MessageReader createReader(Mqtt3Publish message) {
return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), message.getPayloadAsBytes());
}
/**
* Create a {@link MessageReader} for an MQTT V5 message
*
* @param message An MQTT V5 message.
* @return A message reader.
*/
public static MessageReader createReader(Mqtt5Publish message) {
Optional<MqttUtf8String> cType = message.getContentType();
String contentType = cType.isPresent() ? cType.get().toString() : null;
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, message.getPayloadAsBytes()),
() -> getSpecVersion(message),
sv -> new BinaryMessageReader(sv, contentType, message)
);
}
/**
* Create a {@link MessageWriter} for an MQTT V5 Message.
*
* @param builder {@link Mqtt5PublishBuilder.Complete}
* @return A message writer.
*/
public static MessageWriter createWriter(Mqtt5PublishBuilder.Complete builder) {
return new V5MessageWriter(builder);
}
/**
* Create a {@link MessageWriter} for an MQTT V3 Message.
* <p>
* Only supports structured messages.
*
* @param builder {@link Mqtt3PublishBuilder.Complete}
* @return A message writer.
*/
public static MessageWriter createWriter(Mqtt3PublishBuilder.Complete builder) {
return new V3MessageWriter(builder);
}
// -- Private functions
/**
* Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties.
*
* @param message An MQTT message.
* @return spec version attribute content.
*/
private static String getSpecVersion(Mqtt5Publish message) {
List<Mqtt5UserProperty> props = (List<Mqtt5UserProperty>) message.getUserProperties().asList();
Optional<Mqtt5UserProperty> up = props.stream().filter(p -> p.getName().toString().equals(CloudEventV1.SPECVERSION)).findFirst();
return (up.isPresent()) ? up.get().getValue().toString() : null;
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
class V3MessageWriter implements MessageWriter<CloudEventWriter<Mqtt3PublishBuilder>, Mqtt3PublishBuilder> {
Mqtt3PublishBuilder.Complete builder;
V3MessageWriter(Mqtt3PublishBuilder.Complete builder) {
this.builder = builder;
}
@Override
public CloudEventWriter<Mqtt3PublishBuilder> create(SpecVersion version) throws CloudEventRWException {
// No-Op
throw CloudEventRWException.newOther("Internal Error");
}
@Override
public Mqtt3PublishBuilder setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
// No-Op
throw CloudEventRWException.newOther("Internal Error");
}
@Override
public Mqtt3PublishBuilder writeStructured(CloudEvent event, String format) {
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format);
if (eventFormat != null) {
return writeStructured(event, eventFormat);
} else {
throw CloudEventRWException.newOther("Unsupported Format: " + format);
}
}
@Override
public Mqtt3PublishBuilder writeStructured(CloudEvent event, EventFormat format) {
final byte[] data = format.serialize(event);
builder.payload(data);
return builder;
}
@Override
public Mqtt3PublishBuilder writeBinary(CloudEvent event) {
throw CloudEventRWException.newOther("MQTT V3 Does not support CloudEvent Binary mode");
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
class V5MessageWriter implements MessageWriter<CloudEventWriter<Mqtt5PublishBuilder.Complete>, Mqtt5PublishBuilder.Complete>, CloudEventWriter<Mqtt5PublishBuilder.Complete> {
private final Mqtt5PublishBuilder.Complete builder;
V5MessageWriter(Mqtt5PublishBuilder.Complete builder) {
this.builder = builder;
}
@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
builder.userProperties().add(name, value).applyUserProperties();
return this;
}
@Override
public Mqtt5PublishBuilder.Complete end(CloudEventData data) throws CloudEventRWException {
builder.payload(data.toBytes());
return end();
}
@Override
public Mqtt5PublishBuilder.Complete end() throws CloudEventRWException {
return builder;
}
@Override
public CloudEventWriter<Mqtt5PublishBuilder.Complete> create(SpecVersion version) throws CloudEventRWException {
withContextAttribute("specversion", version.toString());
return this;
}
@Override
public Mqtt5PublishBuilder.Complete setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
builder.contentType(format.serializedContentType());
builder.payload(value);
return end();
}
}

View File

@ -0,0 +1,13 @@
/**
* This module implements the MQTT binding specification using the
* HiveMQ MQTT client library.
* <p>
* Use the {@link io.cloudevents.mqtt.hivemq.MqttMessageFactory} to obtain
* CloudEvent reader and writer instances.
* <p>
* Both V3 and V5 versions of MQTT are supported.
*
* @since 2.5.0
*/
package io.cloudevents.mqtt.hivemq;

View File

@ -0,0 +1,75 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class MqttMessageFactoryTest {
@Test
public void createV3Writer() {
}
@Test
public void createV5Writer() {
Assertions.assertNotNull(MqttMessageFactory.createWriter((Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder()));
}
@Test
public void create3Reader() {
Mqtt3Publish msg = Mqtt3Publish.builder().topic("test").build();
Assertions.assertNotNull(MqttMessageFactory.createReader(msg));
}
@Test
public void createV5ReaderFromStructured() {
// If the content-type is present then hopefully it's a
// cloudvent one.
EventFormat ef = CSVFormat.INSTANCE;
EventFormatProvider.getInstance().registerFormat(ef);
Mqtt5Publish msg = Mqtt5Publish.builder()
.topic("test")
.contentType(ef.serializedContentType())
.build();
Assertions.assertNotNull(MqttMessageFactory.createReader(msg));
}
@Test
public void createV5ReaderFromBinary() {
Mqtt5Publish msg = Mqtt5Publish.builder()
.topic("test")
.userProperties().add("specversion", "1.0").applyUserProperties()
.build();
Assertions.assertNotNull(MqttMessageFactory.createReader(msg));
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.core.test.Data;
import io.cloudevents.rw.CloudEventRWException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class V3MessageWriterTest {
Mqtt3PublishBuilder.Complete builder;
V3MessageWriter writer;
EventFormat csvFormat = CSVFormat.INSTANCE;
V3MessageWriterTest() {
builder = (Mqtt3PublishBuilder.Complete) Mqtt3Publish.builder();
writer = new V3MessageWriter(builder);
EventFormatProvider.getInstance().registerFormat(csvFormat);
}
@Test
void create() {
}
@Test
void setEvent() {
}
@Test
void writeStructuredA() {
assertNotNull(writer.writeStructured(Data.V1_MIN, csvFormat.serializedContentType()));
}
@Test
void testWriteStructuredB() {
assertNotNull(writer.writeStructured(Data.V1_MIN, csvFormat));
}
@Test
void writeBinary() {
// This should fail
Assertions.assertThrows(CloudEventRWException.class, () -> {
writer.writeBinary(Data.V1_MIN);
});
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.test.Data;
import io.cloudevents.jackson.JsonFormat;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
/**
* Round-Trip Tests
* <p>
* - serialize a CloudEvent into an MQTT Message.
* - de-serialize the message into a new CloudEvent
* - verify that the new CE matches the original CE
*/
public class V3RoundTripTests {
/**
* This test set is limited owing to the fact that:
* (a) We only support JSON Format
* (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event.
*
* @return
*/
static Stream<CloudEvent> simpleEvents() {
return Stream.of(
Data.V03_MIN,
Data.V03_WITH_TEXT_DATA,
Data.V1_MIN,
Data.V1_WITH_TEXT_DATA,
Data.V1_WITH_XML_DATA
);
}
@ParameterizedTest
@MethodSource("simpleEvents")
public void roundTrip(CloudEvent ce) {
EventFormat format = new JsonFormat();
Assertions.assertNotNull(format);
Mqtt3Publish message = null;
Mqtt3PublishBuilder.Complete builder = (Mqtt3PublishBuilder.Complete) Mqtt3Publish.builder();
builder.topic("test.test.test");
// Write the event out as a message.
MessageWriter writer = MqttMessageFactory.createWriter(builder);
Assertions.assertNotNull(writer);
writer.writeStructured(ce, format);
message = builder.build();
Assertions.assertNotNull(message);
// Read it back and verify
// Read the message back into an event
MessageReader reader = MqttMessageFactory.createReader(message);
Assertions.assertNotNull(reader);
CloudEvent newCE = reader.toEvent();
Assertions.assertNotNull(newCE);
// And now ensure we got back what we wrote
Assertions.assertEquals(ce, newCE);
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
public class V5MessageWriterTest {
private final Mqtt5PublishBuilder builder;
private final V5MessageWriter writer;
V5MessageWriterTest() {
builder = Mqtt5Publish.builder();
writer = new V5MessageWriter((Mqtt5PublishBuilder.Complete) builder);
builder.topic("tester");
}
@Test
public void testWithContextAttribute() {
Assertions.assertNotNull(writer.withContextAttribute("test", "testing"));
Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build();
ensureProperty(msg, "test", "testing");
}
@Test
public void testWithContextAttributes() {
Assertions.assertNotNull(writer.withContextAttribute("test1", "testing1"));
Assertions.assertNotNull(writer.withContextAttribute("test2", "testing2"));
Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build();
ensureProperty(msg, "test1", "testing1");
ensureProperty(msg, "test2", "testing2");
}
@Test
public void testEnd() {
Assertions.assertNotNull(writer.end());
}
@Test
public void testEndWithData() {
final byte[] tData = {0x00, 0x02, 0x42};
Assertions.assertNotNull(writer.end(BytesCloudEventData.wrap(tData)));
Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build();
Assertions.assertNotNull(msg.getPayloadAsBytes());
Assertions.assertEquals(msg.getPayloadAsBytes().length, tData.length);
}
@Test
public void testCreate() {
Assertions.assertNotNull(writer.create(SpecVersion.V1));
Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build();
ensureProperty(msg, "specversion", SpecVersion.V1.toString());
}
private void ensureProperty(Mqtt5Publish msg, String name, String val) {
List<Mqtt5UserProperty> props = (List<Mqtt5UserProperty>) msg.getUserProperties().asList();
Mqtt5UserProperty prop = null;
for (Mqtt5UserProperty up : props) {
if (up.getName().toString().equals(name)) {
prop = up;
break;
}
}
Assertions.assertNotNull(prop);
Assertions.assertEquals(prop.getValue().toString(), val);
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.mock.CSVFormat;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
/**
* Round-Trip Tests
* <p>
* For both Binary and Structured modes:
* - serialize a CloudEvent into an MQTT Message.
* - de-serialize the message into a new CloudEvent
* - verify that the new CE matches the original CE
*/
public class V5RoundTripTests {
private static void readAndVerify(CloudEvent ce, Mqtt5Publish message) {
Assertions.assertNotNull(message);
// Read the message back into an event
MessageReader reader = MqttMessageFactory.createReader(message);
Assertions.assertNotNull(reader);
CloudEvent newCE = reader.toEvent();
Assertions.assertNotNull(newCE);
// And now ensure we got back what we wrote
Assertions.assertEquals(ce, newCE);
}
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void roundTripBinary(CloudEvent ce) {
// Write the event out as a message.
Mqtt5Publish message = null;
Mqtt5PublishBuilder.Complete builder = (Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder();
builder.topic("test.test.test");
MessageWriter writer = MqttMessageFactory.createWriter(builder);
Assertions.assertNotNull(writer);
writer.writeBinary(ce);
message = builder.build();
// Read it back and verify
readAndVerify(ce, message);
}
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void roundTripStructured(CloudEvent ce) {
EventFormat format = CSVFormat.INSTANCE;
Mqtt5Publish message = null;
Mqtt5PublishBuilder.Complete builder = (Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder();
builder.topic("test.test.test");
// Write the event out as a message.
MessageWriter writer = MqttMessageFactory.createWriter(builder);
Assertions.assertNotNull(writer);
writer.writeStructured(ce, format);
message = builder.build();
// Read it back and verify
readAndVerify(ce, message);
}
}

106
bindings/mqtt/paho/pom.xml Normal file
View File

@ -0,0 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-parent</artifactId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<artifactId>cloudevents-mqtt-paho</artifactId>
<name>CloudEvents - MQTT Paho Binding</name>
<packaging>jar</packaging>
<properties>
<module-name>io.cloudevents.mqtt.paho</module-name>
<paho.version>1.2.5</paho.version>
<apache.commons.version>3.12.0</apache.commons.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-mqtt-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- MQTT V5 Support -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>${paho.version}</version>
<scope>provided</scope>
</dependency>
<!-- MQTT V3 Support -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${paho.version}</version>
<scope>provided</scope>
</dependency>
<!-- Test deps -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<!-- We need a JSON Format for V3 compliance checking -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.5.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,57 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.SpecVersion;
import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
final class BinaryMessageReader extends BaseMqttBinaryMessageReader {
private final List<UserProperty> userProperties;
BinaryMessageReader(final SpecVersion version, final String contentType, MqttMessage message) {
super(version, contentType, message.getPayload());
// Sanity Check
if (message.getProperties().getUserProperties() != null) {
userProperties = message.getProperties().getUserProperties();
} else {
userProperties = Collections.emptyList();
}
}
@Override
protected void forEachUserProperty(BiConsumer<String, Object> fn) {
userProperties.forEach(up -> {
final String key = up.getKey();
final String val = up.getValue();
if (key != null && val != null) {
fn.accept(key, val);
}
});
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import java.util.List;
import java.util.Optional;
/**
* General Utility functions
*/
final class PahoMessageUtils {
/**
* Prevent Instantiation
*/
private PahoMessageUtils() {
}
/**
* Get the value of a specific user property from a message.
*
* @param msg The MQTT Message
* @param name The property to retrieve.
* @return property value or NULL if not set.
*/
static String getUserProperty(final MqttMessage msg, final String name) {
final MqttProperties mProps = msg.getProperties();
return (mProps == null) ? null : getUserProperty(mProps.getUserProperties(), name);
}
/**
* Get the value of a specific user property from a message.
*
* @param props The List of MQTT Message properties
* @param name The property to retrieve.
* @return property value or NULL if not set.
*/
public static String getUserProperty(final List<UserProperty> props, final String name) {
if (props == null) {
return null;
} else {
Optional<UserProperty> up = props.stream().filter(p -> p.getKey().equals(name)).findFirst();
return up.map(UserProperty::getValue).orElse(null);
}
}
}

View File

@ -0,0 +1,101 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.mqtt.core.MqttUtils;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* A {@link MessageWriter} that writes an CloudEvent to a V3 MQTT Message.
* <p>
* Note: This only supports Structured messages in JSON format as defined
* by the MQTT CloudEvent binding specification.
*/
class V3MessageWriter implements MessageWriter<CloudEventWriter<MqttMessage>, MqttMessage> {
private final MqttMessage message;
V3MessageWriter() {
message = new MqttMessage();
}
/**
* Ensure the supplied content type is appropriate for V3 messages
* as-per binding specification.
* <p>
* Raises exception if not valid.
*
* @param contentType
*/
private void ensureValidContent(String contentType) {
if (!MqttUtils.getDefaultContentType().equals(contentType)) {
throw CloudEventRWException.newOther("MQTT V3 Does not support contentType: " + contentType);
}
}
@Override
public MqttMessage writeStructured(CloudEvent event, String format) {
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format);
// Sanity Check
if (eventFormat == null) {
}
return writeStructured(event, eventFormat);
}
@Override
public MqttMessage writeStructured(CloudEvent event, EventFormat format) {
// Ensure format is valid
ensureValidContent(format.serializedContentType());
// Populate the structured format.
message.setPayload(format.serialize(event));
// Done.
return message;
}
@Override
public MqttMessage writeBinary(CloudEvent event) {
// This operation is not allowed.
// This should fail
throw CloudEventRWException.newOther("MQTT V3 Does not support CloudEvent Binary mode");
}
@Override
public CloudEventWriter<MqttMessage> create(SpecVersion version) throws CloudEventRWException {
return null;
}
@Override
public MqttMessage setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
ensureValidContent(format.serializedContentType());
message.setPayload(value);
return message;
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.mqtt.core.MqttUtils;
import io.cloudevents.rw.CloudEventWriter;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* MQTT V3 factory to :
* - Obtain a {@link MessageReader} to read CloudEvents from MQTT messages.
* - Create a {@link MessageWriter} enabling CloudEVents to be written to an MQTT message.
* <p>
* NOTE: The V3 binding only supports structured messages using a JSON Format.
*/
public final class V3MqttMessageFactory {
/**
* Prevent instantiation.
*/
private V3MqttMessageFactory() {
}
/**
* Create a {@link MessageReader} to read a V3 MQTT Messages as a CloudEVents
*
* @param mqttMessage An MQTT Message.
* @return {@link MessageReader}
*/
public static MessageReader createReader(MqttMessage mqttMessage) {
return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), mqttMessage.getPayload());
}
/**
* Creates a {@link MessageWriter} to write a CloudEvent to an MQTT {@link MqttMessage}.
* <p>
* NOTE: This implementation *only* supports JSON structured format as-per the MQTT binding specification.
*
* @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to MQTT.
*/
public static MessageWriter<CloudEventWriter<MqttMessage>, MqttMessage> createWriter() {
return new V3MessageWriter();
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import java.util.ArrayList;
import java.util.List;
class V5MessageWriter<R> implements MessageWriter<CloudEventWriter<MqttMessage>, MqttMessage>, CloudEventWriter<MqttMessage> {
private final List<UserProperty> userProperties;
private final MqttMessage message;
V5MessageWriter() {
userProperties = new ArrayList<>(10);
message = new MqttMessage();
message.setProperties(new MqttProperties());
}
// -- Implementation Overrides
@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
final UserProperty up = new UserProperty(name, value);
userProperties.add(up);
return this;
}
@Override
public MqttMessage end(CloudEventData data) throws CloudEventRWException {
message.setPayload(data.toBytes());
return end();
}
@Override
public MqttMessage end() throws CloudEventRWException {
if (userProperties.size() != 0) {
message.getProperties().setUserProperties(userProperties);
}
return message;
}
@Override
public CloudEventWriter<MqttMessage> create(SpecVersion version) throws CloudEventRWException {
userProperties.add(new UserProperty("specversion", version.toString()));
return this;
}
@Override
public MqttMessage setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
message.getProperties().setContentType(format.serializedContentType());
message.setPayload(value);
return end();
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.rw.CloudEventWriter;
import org.eclipse.paho.mqttv5.common.MqttMessage;
/**
* MQTT V5 factory to :
* - Obtain a {@link MessageReader} to read CloudEvents from MQTT messages.
* - Create a {@link MessageWriter} enabling CloudEVents to be written to an MQTT message.
*/
public final class V5MqttMessageFactory {
/**
* Prevent instantiation.
*/
private V5MqttMessageFactory() {
}
/**
* Create a {@link MessageReader} to read MQTT Messages as CloudEVents
*
* @param mqttMessage An MQTT Message.
* @return {@link MessageReader}
*/
public static MessageReader createReader(MqttMessage mqttMessage) {
final String contentType = mqttMessage.getProperties().getContentType();
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, mqttMessage.getPayload()),
() -> PahoMessageUtils.getUserProperty(mqttMessage, CloudEventV1.SPECVERSION),
sv -> new BinaryMessageReader(sv, contentType, mqttMessage)
);
}
/**
* Creates a {@link MessageWriter} capable of translating both a structured and binary CloudEvent
* to an MQTT {@link MqttMessage}
*
* @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to MQTT using structured or binary encoding.
*/
public static MessageWriter<CloudEventWriter<MqttMessage>, MqttMessage> createWriter() {
return new V5MessageWriter<>();
}
}

View File

@ -0,0 +1,10 @@
/**
* This module implements the MQTT binding specification using the
* Paho MQTT client library.
* <p>
* Separate factories are provided for MQTT V3 and V5.
*
* @since 2.5.0
*/
package io.cloudevents.mqtt.paho;

View File

@ -0,0 +1,78 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
public class PahoMessageUtilsTest {
@Test
void verifyPropertyList() {
List<UserProperty> props = new ArrayList<>(5);
// Ensure Works with null List
Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "id"));
// Ensure works with empty list.
Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "id"));
// Create some props
props = new ArrayList<>(5);
props.add(new UserProperty("id", "aaa-bbb-ccc"));
props.add(new UserProperty("specversion", "v1.0"));
// Ensure Presence
Assertions.assertEquals("aaa-bbb-ccc", PahoMessageUtils.getUserProperty(props, "id"));
// Ensure Absence
Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "scoobydoo"));
}
@Test
void verifyMessageProperties() {
MqttMessage msg = new MqttMessage();
// Verify message with no props
Assertions.assertNull(PahoMessageUtils.getUserProperty(msg, "id"));
// Create some props
List<UserProperty> props = null;
props = new ArrayList<>(5);
props.add(new UserProperty("id", "aaa-bbb-ccc"));
props.add(new UserProperty("specversion", "v1.0"));
msg.setProperties(new MqttProperties());
msg.getProperties().setUserProperties(props);
// Ensure Presence
Assertions.assertEquals("aaa-bbb-ccc", PahoMessageUtils.getUserProperty(msg, "id"));
// Ensure Absence
Assertions.assertNull(PahoMessageUtils.getUserProperty(msg, "scoobydoo"));
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.test.Data;
import io.cloudevents.rw.CloudEventRWException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
public class V3MessageFactoryTest {
@Test
public void ensureSerializationFormat() {
MqttMessage message = null;
// This should fail as we don't support CSV Format
MessageWriter writer = V3MqttMessageFactory.createWriter();
Assertions.assertNotNull(writer);
// Expect an exception
Assertions.assertThrows(CloudEventRWException.class, () -> {
writer.writeStructured(Data.V1_MIN, CSVFormat.INSTANCE);
});
}
@ParameterizedTest()
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void ensureDeserialization(CloudEvent ce) {
final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
final byte[] contentPayload = CSVFormat.INSTANCE.serialize(ce);
// Build the MQTT Message
MqttMessage m = new MqttMessage();
m.setPayload(contentPayload);
// Get a reader
MessageReader reader = V3MqttMessageFactory.createReader(m);
Assertions.assertNotNull(reader);
// This should fail
// Expect an exception
Assertions.assertThrows(EventDeserializationException.class, () -> {
reader.toEvent();
});
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.test.Data;
import io.cloudevents.jackson.JsonFormat;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
/**
* Round-Trip Tests
* <p>
* - serialize a CloudEvent into an MQTT Message.
* - de-serialize the message into a new CloudEvent
* - verify that the new CE matches the original CE
*/
public class V3RoundTripTests {
/**
* This test set is limited owing to the the fact that:
* (a) We only support JSON Format
* (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event.
*
* @return
*/
static Stream<CloudEvent> simpleEvents() {
return Stream.of(
Data.V03_MIN,
Data.V03_WITH_TEXT_DATA,
Data.V1_MIN,
Data.V1_WITH_TEXT_DATA,
Data.V1_WITH_XML_DATA
);
}
@ParameterizedTest
@MethodSource("simpleEvents")
public void roundTrip(CloudEvent ce) {
EventFormat format = new JsonFormat();
Assertions.assertNotNull(format);
// Write the event out as a message.
MessageWriter writer = V3MqttMessageFactory.createWriter();
Assertions.assertNotNull(writer);
MqttMessage message = (MqttMessage) writer.writeStructured(ce, format);
Assertions.assertNotNull(message);
// Read it back and verify
// Read the message back into an event
MessageReader reader = V3MqttMessageFactory.createReader(message);
Assertions.assertNotNull(reader);
CloudEvent newCE = reader.toEvent();
Assertions.assertNotNull(newCE);
// And now ensure we got back what we wrote
Assertions.assertEquals(ce, newCE);
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.test.Data;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.types.Time;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
public class V5MessageFactoryTest {
private static final String DATACONTENTTYPE_NULL = null;
private static final byte[] DATAPAYLOAD_NULL = null;
private static Stream<Arguments> binaryTestArguments() {
return Stream.of(
// V03
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString())
),
DATACONTENTTYPE_NULL,
DATAPAYLOAD_NULL,
Data.V03_MIN
),
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME))
),
Data.DATACONTENTTYPE_JSON,
Data.DATA_JSON_SERIALIZED,
Data.V03_WITH_JSON_DATA
)
);
}
private static UserProperty property(String key, String val) {
return new UserProperty(key, val);
}
private static List<UserProperty> properties(final UserProperty... props) {
return Stream.of(props).collect(Collectors.toList());
}
@Test
public void testWriteBinary() {
final MqttMessage message = V5MqttMessageFactory.createWriter().writeBinary(Data.V1_MIN);
Assertions.assertNotNull(message);
}
// Test Data
@Test
public void testWriteStructured() {
final MqttMessage message = V5MqttMessageFactory.createWriter().writeStructured(Data.V1_MIN, CSVFormat.INSTANCE);
Assertions.assertNotNull(message);
}
@ParameterizedTest()
@MethodSource("binaryTestArguments")
public void testReadBinary(List<UserProperty> userProps, String contentType, byte[] data, CloudEvent ce) {
MqttMessage msg = new MqttMessage();
// Populate Properties
MqttProperties props = new MqttProperties();
props.setUserProperties(userProps);
msg.setProperties(props);
// Populate payload & contentType
if (data != null) {
msg.setPayload(data);
}
if (contentType != null) {
msg.getProperties().setContentType(contentType);
}
MessageReader reader = V5MqttMessageFactory.createReader(msg);
Assertions.assertNotNull(reader);
assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
CloudEvent newCe = reader.toEvent();
assertThat(newCe).isEqualTo(ce);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void testReadStructured(CloudEvent ce) {
final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
final byte[] contentPayload = CSVFormat.INSTANCE.serialize(ce);
// Build the MQTT Message
MqttMessage m = new MqttMessage();
MqttProperties props = new MqttProperties();
props.setContentType(contentType);
m.setProperties(props);
m.setPayload(contentPayload);
// Get a reader
MessageReader reader = V5MqttMessageFactory.createReader(m);
Assertions.assertNotNull(reader);
assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
// Re-Hydrate the CloudEvent
CloudEvent newCE = reader.toEvent();
Assertions.assertNotNull(newCE);
// And hopefully they match
assertThat(newCE).isEqualTo(ce);
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.paho;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.mock.CSVFormat;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
/**
* Round-Trip Tests
* <p>
* For both Binary and Structured modes:
* - serialize a CloudEvent into an MQTT Message.
* - de-serialize the message into a new CloudEvent
* - verify that the new CE matches the original CE
*/
public class V5RoundTripTests {
private static void readAndVerify(CloudEvent ce, MqttMessage message) {
Assertions.assertNotNull(message);
// Read the message back into an event
MessageReader reader = V5MqttMessageFactory.createReader(message);
Assertions.assertNotNull(reader);
CloudEvent newCE = reader.toEvent();
Assertions.assertNotNull(newCE);
// And now ensure we got back what we wrote
Assertions.assertEquals(ce, newCE);
}
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void roundTripBinary(CloudEvent ce) {
// Write the event out as a message.
MessageWriter writer = V5MqttMessageFactory.createWriter();
Assertions.assertNotNull(writer);
MqttMessage message = (MqttMessage) writer.writeBinary(ce);
// Read it back and verify
readAndVerify(ce, message);
}
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void roundTripStructured(CloudEvent ce) {
EventFormat format = CSVFormat.INSTANCE;
// Write the event out as a message.
MessageWriter writer = V5MqttMessageFactory.createWriter();
Assertions.assertNotNull(writer);
MqttMessage message = (MqttMessage) writer.writeStructured(ce, format);
// Read it back and verify
readAndVerify(ce, message);
}
}

View File

@ -27,23 +27,26 @@ Using the Java SDK you can:
## Supported features
| | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| :------------------------------------------------: | :---------------------------------------------------: | :---------------------------------------------------: |
|:--------------------------------------------------:|:-----------------------------------------------------:|:-----------------------------------------------------:|
| CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: |
| AMQP Protocol Binding | :x: | :x: |
| - [Proton](amqp-proton.md) | :heavy_check_mark: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| - [Paho](mqtt.md) | :heavy_check_mark: | :heavy_check_mark: |
| - [HiveMQ](mqtt.md) | :heavy_check_mark: | :heavy_check_mark: |
| AVRO Event Format | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| - [Vert.x](http-vertx.md) | :heavy_check_mark: | :heavy_check_mark: |
| - [Jakarta Restful WS](http-jakarta-restful-ws.md) | :heavy_check_mark: | :heavy_check_mark: |
| - [Basic](http-basic.md) | :heavy_check_mark: | :heavy_check_mark: |
| - [Spring](spring.md) | :heavy_check_mark: | :heavy_check_mark: |
| - [http4k][http4k]<sup></sup> | :heavy_check_mark: | :heavy_check_mark: |
| - [http4k][http4k]<sup></sup> | :heavy_check_mark: | :heavy_check_mark: |
| JSON Event Format | :heavy_check_mark: | :heavy_check_mark: |
| - [Jackson](json-jackson.md) | :heavy_check_mark: | :heavy_check_mark: |
| Protobuf Event Format | :heavy_check_mark: | :heavy_check_mark: |
| - [Proto](protobuf.md) | :heavy_check_mark: | :heavy_check_mark: |
| XML Event Format | :heavy_check_mark: | :heavy_check_mark: |
| - [XML](xml.md) | :heavy_check_mark: | :heavy_check_mark: |
| Protobuf Event Format | :heavy_check_mark: | :heavy_check_mark: |
| - [Proto](protobuf.md) | :heavy_check_mark: | :heavy_check_mark: |
| XML Event Format | :heavy_check_mark: | :heavy_check_mark: |
| - [XML](xml.md) | :heavy_check_mark: | :heavy_check_mark: |
| [Kafka Protocol Binding](kafka.md) | :heavy_check_mark: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| NATS Protocol Binding | :x: | :x: |

60
docs/mqtt.md Normal file
View File

@ -0,0 +1,60 @@
---
title: CloudEvents MQTT
nav_order: 5
---
# MQTT Support
The SDK supports both V3 and V5 MQTT binding specifications via these Java client libraries:
* [Paho]()
* [HiveMQ]()
NOTE: MQTT V3 *only* supports structured mode transfer of CloudEVents. Operations related to binary mode transmission
are either not available or will throw runtime exceptions if an attempt is made to use them.
Both client library implementations rely on a *provided* maven dependency.
# General Usage
There is a slight variance in usage between the two supported client libraries owing to the way those clients
have implemented support for the two versions of MQTT but the general pattern is the same as every other protocol
binding.
## Creating a message from a CloudEvent
1. Obtain a `MessageWriter` from a factory.
2. Use the writer to populate the MQTT message using structured or binary mode.
* `mqttMessage = messageWriter.writeBinary(cloudEvent);` or,
* `mqttMessage = messageWriter.writeStructured(cloudEvent, eventFormat);`
## Creating a CloudEvent from a message.
1. Obtain a 'MessageReader' from a message factory for an MQTT message.
2. Obtain a CloudEvent from the reader.
* _CloudEvent cloudEvent = reader.toEvent();_
# PAHO Client Usage
## Maven
```xml
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-mqtt-paho</artifactId>
<version>2.x.y</version>
</dependency>
```
# HiveMQ Client Usage
## Maven
```xml
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-mqtt-hivemq</artifactId>
<version>2.x.y</version>
</dependency>
```

View File

@ -72,6 +72,9 @@
<module>formats/json-jackson</module>
<module>formats/protobuf</module>
<module>formats/xml</module>
<module>bindings/mqtt/core</module>
<module>bindings/mqtt/paho</module>
<module>bindings/mqtt/hivemq</module>
<module>amqp</module>
<module>http/basic</module>
<module>http/vertx</module>