From de2b76a5ef200bd81cbe8089c10e0a156bcdc8c6 Mon Sep 17 00:00:00 2001 From: Jem Day Date: Thu, 5 Jan 2023 15:10:39 -0800 Subject: [PATCH 1/2] Initial revision for comment. --- bindings/mqtt/core/pom.xml | 31 ++++ .../core/BaseMqttBinaryMessageReader.java | 88 +++++++++++ .../io/cloudevents/mqtt/core/MqttUtils.java | 35 +++++ bindings/mqtt/hivemq/pom.xml | 80 ++++++++++ .../mqtt/hivemq/BinaryMessageReader.java | 33 ++++ .../mqtt/hivemq/MqttMessageFactory.java | 105 +++++++++++++ .../mqtt/hivemq/V3MessageWriter.java | 56 +++++++ .../mqtt/hivemq/V5MessageWriter.java | 50 ++++++ .../cloudevents/mqtt/hivemq/package-info.java | 14 ++ .../mqtt/hivemq/MqttMessageFactoryTest.java | 59 +++++++ .../mqtt/hivemq/V3MessageWriterTest.java | 55 +++++++ .../mqtt/hivemq/V3RoundTripTests.java | 78 ++++++++++ .../mqtt/hivemq/V5MessageWriterTest.java | 91 +++++++++++ .../mqtt/hivemq/V5RoundTripTests.java | 84 ++++++++++ bindings/mqtt/paho/pom.xml | 106 +++++++++++++ .../mqtt/paho/BinaryMessageReader.java | 41 +++++ .../mqtt/paho/PahoMessageUtils.java | 54 +++++++ .../mqtt/paho/V3MessageWriter.java | 85 ++++++++++ .../mqtt/paho/V3MqttMessageFactory.java | 50 ++++++ .../mqtt/paho/V5MessageWriter.java | 64 ++++++++ .../mqtt/paho/V5MqttMessageFactory.java | 57 +++++++ .../cloudevents/mqtt/paho/package-info.java | 11 ++ .../mqtt/paho/PahoMessageUtilsTest.java | 62 ++++++++ .../mqtt/paho/V3MessageFactoryTest.java | 75 +++++++++ .../mqtt/paho/V3RoundTripTests.java | 72 +++++++++ .../mqtt/paho/V5MessageFactoryTest.java | 146 ++++++++++++++++++ .../mqtt/paho/V5RoundTripTests.java | 70 +++++++++ docs/index.md | 14 +- docs/mqtt-paho.md | 41 +++++ pom.xml | 3 + 30 files changed, 1804 insertions(+), 6 deletions(-) create mode 100644 bindings/mqtt/core/pom.xml create mode 100644 bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java create mode 100644 bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java create mode 100644 bindings/mqtt/hivemq/pom.xml create mode 100644 bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java create mode 100644 bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java create mode 100644 bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java create mode 100644 bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java create mode 100644 bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java create mode 100644 bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java create mode 100644 bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java create mode 100644 bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java create mode 100644 bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java create mode 100644 bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java create mode 100644 bindings/mqtt/paho/pom.xml create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java create mode 100644 bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java create mode 100644 bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java create mode 100644 bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java create mode 100644 bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java create mode 100644 bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java create mode 100644 bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java create mode 100644 docs/mqtt-paho.md diff --git a/bindings/mqtt/core/pom.xml b/bindings/mqtt/core/pom.xml new file mode 100644 index 00000000..0fd493ac --- /dev/null +++ b/bindings/mqtt/core/pom.xml @@ -0,0 +1,31 @@ + + + + cloudevents-parent + io.cloudevents + 2.5.0-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + cloudevents-mqtt-core + CloudEvents - MQTT Common + jar + + + io.cloudevents.mqtt.core + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + + diff --git a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java new file mode 100644 index 00000000..a0aadea0 --- /dev/null +++ b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java @@ -0,0 +1,88 @@ +package io.cloudevents.mqtt.core; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; +import io.cloudevents.core.v1.CloudEventV1; + +import java.util.function.BiConsumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Enable the hydration of a CloudEvent in binary mode from an MQTT message. + *

+ * This abstract class provides common behavior across different MQTT + * client implementations. + */ +public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl { + + /** + * CloudEvent attribute names must match this pattern. + */ + private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$"); + private final String contentType; + + /** + * Initialise the binary message reader. + * @param version The CloudEvent message version. + * @param contentType The assigned media content type. + * @param payload The raw data payload from the MQTT message. + */ + protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) { + super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null); + this.contentType = contentType; + } + + // --- Overrides + + @Override + protected boolean isContentTypeHeader(String key) { + return false; // The content type is not defined in a user-property + } + + @Override + protected boolean isCloudEventsHeader(String key) { + + // The binding specification does not require name prefixing, + // as such any user-property is a potential CE Context Attribute. + // + // If the name complies with CE convention then we'll assume + // it's a context attribute. + // + Matcher m = CE_ATTR_NAME_REGEX.matcher(key); + return m.matches(); + } + + @Override + protected String toCloudEventsKey(String key) { + return key; // No special prefixing occurs in the MQTT binding spec. + } + + + @Override + protected void forEachHeader(BiConsumer fn) { + + // If there is a content-type then we need set it. + // Inspired by AMQP/Proton code :-) + + if (contentType != null) { + fn.accept(CloudEventV1.DATACONTENTTYPE, contentType); + } + + // Now process each MQTT User Property. + forEachUserProperty(fn); + + } + + @Override + protected String toCloudEventsValue(Object value) { + return value.toString(); + } + + /** + * Visit each MQTT user-property and invoke the supplied function. + * @param fn The function to invoke for each MQTT User property. + */ + protected abstract void forEachUserProperty(BiConsumer fn); +} diff --git a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java new file mode 100644 index 00000000..9efb2285 --- /dev/null +++ b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java @@ -0,0 +1,35 @@ +package io.cloudevents.mqtt.core; + +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; + +/** + * General MQTT Utilities and Helpers + */ +public class MqttUtils { + + private MqttUtils() {} + + private static final String DEFAULT_FORMAT = "application/cloudevents+json"; + + /** + * Obtain the {@link EventFormat} to use when working with MQTT V3 + * messages. + * + * @return An event format. + */ + public static EventFormat getDefaultEventFormat () { + + return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT); + + } + + /** + * Get the default content type to assume for MQTT messages. + * @return A Content-Type + */ + public static final String getDefaultContentType() { + return DEFAULT_FORMAT; + } + +} diff --git a/bindings/mqtt/hivemq/pom.xml b/bindings/mqtt/hivemq/pom.xml new file mode 100644 index 00000000..e3106fca --- /dev/null +++ b/bindings/mqtt/hivemq/pom.xml @@ -0,0 +1,80 @@ + + + + cloudevents-parent + io.cloudevents + 2.5.0-SNAPSHOT + ../../../pom.xml + + + 4.0.0 + + cloudevents-mqtt-hivemq + CloudEvents - MQTT HiveMQ Binding + jar + + + io.cloudevents.mqtt.hivemq + 1.3.0 + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + io.cloudevents + cloudevents-mqtt-core + ${project.version} + + + + com.hivemq + hivemq-mqtt-client + ${hivemq.version} + provided + + + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + + + io.cloudevents + cloudevents-json-jackson + ${project.version} + test + + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + + diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java new file mode 100644 index 00000000..ac5d16c6 --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java @@ -0,0 +1,33 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.cloudevents.SpecVersion; +import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader; + +import java.util.function.BiConsumer; + +final class BinaryMessageReader extends BaseMqttBinaryMessageReader { + + Mqtt5Publish message; + + BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) { + super(version, contentType, message.getPayloadAsBytes()); + + this.message = message; + } + + @Override + protected void forEachUserProperty(BiConsumer fn) { + + message.getUserProperties().asList().forEach(up -> { + + final String key = up.getName().toString(); + final String val = up.getValue().toString(); + + if (key != null && val != null) { + fn.accept(key, val); + } + }); + + } +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java new file mode 100644 index 00000000..3016c277 --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java @@ -0,0 +1,105 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.message.impl.MessageUtils; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.mqtt.core.MqttUtils; + +import java.util.List; +import java.util.Optional; + +/** + * A factory to obtain: + * - {@link MessageReader} instances to read CloudEvents from MQTT messages. + * - {@link MessageWriter} instances to write CloudEvents into MQTT messages. + * + */ +public class MqttMessageFactory { + + // Prevent Instantiation. + private MqttMessageFactory() { + } + + /** + * Create a {@link MessageReader} for an MQTT V3 message. + *

+ * As-Per MQTT Binding specification this only supports + * a structured JSON Format message. + * + * @param message An MQTT V3 message. + * @return MessageReader. + */ + public static MessageReader createReader(Mqtt3Publish message) { + return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), message.getPayloadAsBytes()); + } + + /** + * Create a {@link MessageReader} for an MQTT V5 message + * + * @param message An MQTT V5 message. + * @return A message reader. + */ + public static MessageReader createReader(Mqtt5Publish message) { + + Optional cType = message.getContentType(); + + String contentType = cType.isPresent() ? cType.get().toString() : null; + + return MessageUtils.parseStructuredOrBinaryMessage( + () -> contentType, + format -> new GenericStructuredMessageReader(format, message.getPayloadAsBytes()), + () -> getSpecVersion(message), + sv -> new BinaryMessageReader(sv, contentType, message) + ); + } + + + /** + * Create a {@link MessageWriter} for an MQTT V5 Message. + * + * @param builder {@link Mqtt5PublishBuilder.Complete} + * @return A message writer. + */ + public static MessageWriter createWriter(Mqtt5PublishBuilder.Complete builder) { + return new V5MessageWriter(builder); + } + + /** + * Create a {@link MessageWriter} for an MQTT V3 Message. + * + * Only supports structured messages. + * + * @param builder {@link Mqtt3PublishBuilder.Complete} + * @return A message writer. + */ + public static MessageWriter createWriter(Mqtt3PublishBuilder.Complete builder) { + return new V3MessageWriter(builder); + } + + + // -- Private functions + + /** + * Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties. + * @param message An MQTT message. + * @return spec version attribute content. + */ + private static String getSpecVersion(Mqtt5Publish message) { + + List props = (List) message.getUserProperties().asList(); + + Optional up = props.stream().filter(p -> p.getName().toString().equals(CloudEventV1.SPECVERSION)).findFirst(); + + return (up.isPresent()) ? up.get().getValue().toString() : null; + + } + +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java new file mode 100644 index 00000000..9cbafdee --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java @@ -0,0 +1,56 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; + +class V3MessageWriter implements MessageWriter, Mqtt3PublishBuilder> { + + Mqtt3PublishBuilder.Complete builder; + + V3MessageWriter(Mqtt3PublishBuilder.Complete builder) { + this.builder = builder; + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + // No-Op + return null; + } + + @Override + public Mqtt3PublishBuilder setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + // No-Op + return null; + } + + @Override + public Mqtt3PublishBuilder writeStructured(CloudEvent event, String format) { + EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format); + + if (eventFormat != null) { + return writeStructured(event, EventFormatProvider.getInstance().resolveFormat(format)); + } else { + throw CloudEventRWException.newOther("Unsupported Format: " + format); + } + } + + @Override + public Mqtt3PublishBuilder writeStructured(CloudEvent event, EventFormat format) { + final byte[] data = format.serialize(event); + builder.payload(data); + return builder; + } + + @Override + public Mqtt3PublishBuilder writeBinary(CloudEvent event) { + + throw CloudEventRWException.newOther("MQTT V3 Does not support CloudEvent Binary mode"); + + } +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java new file mode 100644 index 00000000..03fb2d81 --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java @@ -0,0 +1,50 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; + +class V5MessageWriter implements MessageWriter, Mqtt5PublishBuilder.Complete>, CloudEventWriter { + + private final Mqtt5PublishBuilder.Complete builder; + + V5MessageWriter(Mqtt5PublishBuilder.Complete builder) { + this.builder = builder; + } + + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + builder.userProperties().add(name, value).applyUserProperties(); + return this; + } + + @Override + public Mqtt5PublishBuilder.Complete end(CloudEventData data) throws CloudEventRWException { + builder.payload(data.toBytes()); + return end(); + } + + @Override + public Mqtt5PublishBuilder.Complete end() throws CloudEventRWException { + return builder; + } + + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + withContextAttribute("specversion", version.toString()); + return this; + } + + @Override + public Mqtt5PublishBuilder.Complete setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + builder.contentType(format.serializedContentType()); + builder.payload(value); + return end(); + } +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java new file mode 100644 index 00000000..2bbf2ebc --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java @@ -0,0 +1,14 @@ +/** + * This module implements the MQTT binding specification using the + * HiveMQ MQTT client library. + * + * Use the {@link io.cloudevents.mqtt.hivemq.MqttMessageFactory} to obtain + * CloudEvent reader and writer instances. + * + * Both V3 and V5 versions of MQTT are supported. + * + * @since 2.5.0 + * + */ + +package io.cloudevents.mqtt.hivemq; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java new file mode 100644 index 00000000..583248db --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java @@ -0,0 +1,59 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MqttMessageFactoryTest { + + @Test + public void createV3Writer() { + } + + @Test + public void createV5Writer() { + Assertions.assertNotNull(MqttMessageFactory.createWriter((Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder())); + } + + @Test + public void create3Reader() { + + Mqtt3Publish msg = Mqtt3Publish.builder().topic("test").build(); + Assertions.assertNotNull(MqttMessageFactory.createReader(msg)); + } + + @Test + public void createV5ReaderFromStructured() { + + // If the content-type is present then hopefully it's a + // cloudvent one. + + EventFormat ef = CSVFormat.INSTANCE; + + EventFormatProvider.getInstance().registerFormat(ef); + + Mqtt5Publish msg = Mqtt5Publish.builder() + .topic("test") + .contentType(ef.serializedContentType()) + .build(); + + Assertions.assertNotNull(MqttMessageFactory.createReader(msg)); + + } + + @Test + public void createV5ReaderFromBinary() { + + Mqtt5Publish msg = Mqtt5Publish.builder() + .topic("test") + .userProperties().add("specversion", "1.0").applyUserProperties() + .build(); + Assertions.assertNotNull(MqttMessageFactory.createReader(msg)); + + } +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java new file mode 100644 index 00000000..800589f1 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java @@ -0,0 +1,55 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.core.test.Data; +import io.cloudevents.rw.CloudEventRWException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class V3MessageWriterTest { + + Mqtt3PublishBuilder.Complete builder; + V3MessageWriter writer; + EventFormat csvFormat = CSVFormat.INSTANCE; + + + V3MessageWriterTest() { + + builder = (Mqtt3PublishBuilder.Complete) Mqtt3Publish.builder(); + writer = new V3MessageWriter(builder); + EventFormatProvider.getInstance().registerFormat(csvFormat); + } + + @Test + void create() { + } + + @Test + void setEvent() { + } + + @Test + void writeStructuredA() { + assertNotNull(writer.writeStructured(Data.V1_MIN, csvFormat.serializedContentType())); + } + + @Test + void testWriteStructuredB() { + assertNotNull(writer.writeStructured(Data.V1_MIN, csvFormat)); + } + + @Test + void writeBinary() { + + // This should fail + Assertions.assertThrows(CloudEventRWException.class, () -> { + writer.writeBinary(Data.V1_MIN); + }); + } +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java new file mode 100644 index 00000000..828a56f4 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java @@ -0,0 +1,78 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.test.Data; +import io.cloudevents.jackson.JsonFormat; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +/** + * Round-Trip Tests + *

+ * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V3RoundTripTests { + + + /** + * This test set is limited owing to the fact that: + * (a) We only support JSON Format + * (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event. + * + * @return + */ + static Stream simpleEvents() { + return Stream.of( + Data.V03_MIN, + Data.V03_WITH_TEXT_DATA, + Data.V1_MIN, + Data.V1_WITH_TEXT_DATA, + Data.V1_WITH_XML_DATA + ); + } + + @ParameterizedTest + @MethodSource("simpleEvents") + public void roundTrip(CloudEvent ce) { + + EventFormat format = new JsonFormat(); + Assertions.assertNotNull(format); + + Mqtt3Publish message = null; + Mqtt3PublishBuilder.Complete builder = (Mqtt3PublishBuilder.Complete) Mqtt3Publish.builder(); + builder.topic("test.test.test"); + + // Write the event out as a message. + MessageWriter writer = MqttMessageFactory.createWriter(builder); + Assertions.assertNotNull(writer); + + writer.writeStructured(ce, format); + message = builder.build(); + + Assertions.assertNotNull(message); + + // Read it back and verify + + // Read the message back into an event + MessageReader reader = MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + + } + +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java new file mode 100644 index 00000000..791b1246 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java @@ -0,0 +1,91 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class V5MessageWriterTest { + + private final Mqtt5PublishBuilder builder; + private final V5MessageWriter writer; + + V5MessageWriterTest() { + builder = Mqtt5Publish.builder(); + writer = new V5MessageWriter((Mqtt5PublishBuilder.Complete) builder); + builder.topic("tester"); + } + + @Test + public void testWithContextAttribute() { + + Assertions.assertNotNull(writer.withContextAttribute("test", "testing")); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + + ensureProperty(msg, "test", "testing"); + } + + @Test + public void testWithContextAttributes() { + + Assertions.assertNotNull(writer.withContextAttribute("test1", "testing1")); + Assertions.assertNotNull(writer.withContextAttribute("test2", "testing2")); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + + ensureProperty(msg, "test1", "testing1"); + ensureProperty(msg, "test2", "testing2"); + } + + @Test + public void testEnd() { + Assertions.assertNotNull(writer.end()); + } + + @Test + public void testEndWithData() { + final byte[] tData = {0x00, 0x02, 0x42}; + + Assertions.assertNotNull(writer.end(BytesCloudEventData.wrap(tData))); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + + Assertions.assertNotNull(msg.getPayloadAsBytes()); + Assertions.assertEquals(msg.getPayloadAsBytes().length, tData.length); + + } + + @Test + public void testCreate() { + Assertions.assertNotNull(writer.create(SpecVersion.V1)); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + ensureProperty(msg, "specversion", SpecVersion.V1.toString()); + + } + + private void ensureProperty(Mqtt5Publish msg, String name, String val) { + + List props = (List) msg.getUserProperties().asList(); + + Mqtt5UserProperty prop = null; + + for (Mqtt5UserProperty up : props) { + + if (up.getName().toString().equals(name)) { + prop = up; + break; + } + } + + Assertions.assertNotNull(prop); + Assertions.assertEquals(prop.getValue().toString(), val); + + } +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java new file mode 100644 index 00000000..5dd9d170 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java @@ -0,0 +1,84 @@ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Round-Trip Tests + *

+ * For both Binary and Structured modes: + * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V5RoundTripTests { + + private static void readAndVerify(CloudEvent ce, Mqtt5Publish message) { + + Assertions.assertNotNull(message); + + // Read the message back into an event + MessageReader reader = MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripBinary(CloudEvent ce) { + + // Write the event out as a message. + Mqtt5Publish message = null; + Mqtt5PublishBuilder.Complete builder = (Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder(); + builder.topic("test.test.test"); + + + MessageWriter writer = MqttMessageFactory.createWriter(builder); + Assertions.assertNotNull(writer); + + writer.writeBinary(ce); + + message = builder.build(); + + // Read it back and verify + readAndVerify(ce, message); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripStructured(CloudEvent ce) { + + EventFormat format = CSVFormat.INSTANCE; + + Mqtt5Publish message = null; + Mqtt5PublishBuilder.Complete builder = (Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder(); + builder.topic("test.test.test"); + + // Write the event out as a message. + MessageWriter writer = MqttMessageFactory.createWriter(builder); + Assertions.assertNotNull(writer); + + writer.writeStructured(ce, format); + + message = builder.build(); + + // Read it back and verify + readAndVerify(ce, message); + + } + + +} diff --git a/bindings/mqtt/paho/pom.xml b/bindings/mqtt/paho/pom.xml new file mode 100644 index 00000000..7324163f --- /dev/null +++ b/bindings/mqtt/paho/pom.xml @@ -0,0 +1,106 @@ + + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 2.5.0-SNAPSHOT + ../../../pom.xml + + + cloudevents-mqtt-paho + CloudEvents - MQTT Paho Binding + jar + + + io.cloudevents.mqtt.paho + 1.2.5 + 3.12.0 + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + io.cloudevents + cloudevents-mqtt-core + ${project.version} + + + + + + org.eclipse.paho + org.eclipse.paho.mqttv5.client + ${paho.version} + provided + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${paho.version} + provided + + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + + + io.cloudevents + cloudevents-json-jackson + 2.5.0-SNAPSHOT + test + + + + + diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java new file mode 100644 index 00000000..e333a5af --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java @@ -0,0 +1,41 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.SpecVersion; +import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; + +final class BinaryMessageReader extends BaseMqttBinaryMessageReader { + + private final List userProperties; + + BinaryMessageReader(final SpecVersion version, final String contentType, MqttMessage message) { + super(version, contentType, message.getPayload()); + + // Sanity Check + if (message.getProperties().getUserProperties() != null) { + userProperties = message.getProperties().getUserProperties(); + } else { + userProperties = Collections.emptyList(); + } + } + + @Override + protected void forEachUserProperty(BiConsumer fn) { + + userProperties.forEach(up -> { + + final String key = up.getKey(); + final String val = up.getValue(); + + if (key != null && val != null) { + fn.accept(key, val); + } + }); + + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java new file mode 100644 index 00000000..1710f7cd --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java @@ -0,0 +1,54 @@ +package io.cloudevents.mqtt.paho; + +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.List; +import java.util.Optional; + +/** + * General Utility functions + */ +final class PahoMessageUtils { + + /** + * Prevent Instantiation + */ + private PahoMessageUtils() { + } + + /** + * Get the value of a specific user property from a message. + * + * @param msg The MQTT Message + * @param name The property to retrieve. + * @return property value or NULL if not set. + */ + static String getUserProperty(final MqttMessage msg, final String name) { + + final MqttProperties mProps = msg.getProperties(); + + return (mProps == null) ? null : getUserProperty(mProps.getUserProperties(), name); + + } + + /** + * Get the value of a specific user property from a message. + * + * @param props The List of MQTT Message properties + * @param name The property to retrieve. + * @return property value or NULL if not set. + */ + public static String getUserProperty(final List props, final String name) { + + if (props == null) { + return null; + } else { + + Optional up = props.stream().filter(p -> p.getKey().equals(name)).findFirst(); + + return up.map(UserProperty::getValue).orElse(null); + } + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java new file mode 100644 index 00000000..18162c68 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java @@ -0,0 +1,85 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.mqtt.core.MqttUtils; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * A {@link MessageWriter} that writes an CloudEvent to a V3 MQTT Message. + * + * Note: This only supports Structured messages in JSON format as defined + * by the MQTT CloudEvent binding specification. + */ +class V3MessageWriter implements MessageWriter, MqttMessage> { + + private final MqttMessage message; + + V3MessageWriter() { + message = new MqttMessage(); + } + + /** + * Ensure the supplied content type is appropriate for V3 messages + * as-per binding specification. + * + * Raises exception if not valid. + * @param contentType + */ + private void ensureValidContent(String contentType) { + + if (!MqttUtils.getDefaultContentType().equals(contentType)) { + + throw CloudEventRWException.newOther("MQTT V3 Does not support contentType: " + contentType); + + } + } + + @Override + public MqttMessage writeStructured(CloudEvent event, String format) { + + final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format); + + // Sanity Check + if (eventFormat == null) { + + } + + return writeStructured(event, eventFormat); + } + + @Override + public MqttMessage writeStructured(CloudEvent event, EventFormat format) { + // Ensure format is valid + ensureValidContent(format.serializedContentType()); + // Populate the structured format. + message.setPayload(format.serialize(event)); + // Done. + return message; + } + + @Override + public MqttMessage writeBinary(CloudEvent event) { + // This operation is not allowed. + // This should fail + throw CloudEventRWException.newOther("MQTT V3 Does not support CloudEvent Binary mode"); + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + return null; + } + + @Override + public MqttMessage setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + ensureValidContent(format.serializedContentType()); + message.setPayload(value); + return message; + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java new file mode 100644 index 00000000..b539cf5f --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java @@ -0,0 +1,50 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.mqtt.core.MqttUtils; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * MQTT V3 factory to : + * - Obtain a {@link MessageReader} to read CloudEvents from MQTT messages. + * - Create a {@link MessageWriter} enabling CloudEVents to be written to an MQTT message. + *

+ * NOTE: The V3 binding only supports structured messages using a JSON Format. + */ + +public final class V3MqttMessageFactory { + + /** + * Prevent instantiation. + */ + private V3MqttMessageFactory() { + + } + + /** + * Create a {@link MessageReader} to read a V3 MQTT Messages as a CloudEVents + * + * @param mqttMessage An MQTT Message. + * @return {@link MessageReader} + */ + public static MessageReader createReader(MqttMessage mqttMessage) { + return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), mqttMessage.getPayload()); + } + + /** + * Creates a {@link MessageWriter} to write a CloudEvent to an MQTT {@link MqttMessage}. + *

+ * NOTE: This implementation *only* supports JSON structured format as-per the MQTT binding specification. + * + * @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to MQTT. + */ + public static MessageWriter, MqttMessage> createWriter() { + return new V3MessageWriter(); + } + +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java new file mode 100644 index 00000000..e04b9f9c --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java @@ -0,0 +1,64 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.ArrayList; +import java.util.List; + +class V5MessageWriter implements MessageWriter, MqttMessage>, CloudEventWriter { + + private final List userProperties; + private final MqttMessage message; + + V5MessageWriter() { + userProperties = new ArrayList<>(10); + message = new MqttMessage(); + message.setProperties(new MqttProperties()); + } + + // -- Implementation Overrides + + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + final UserProperty up = new UserProperty(name, value); + userProperties.add(up); + + return this; + } + + @Override + public MqttMessage end(CloudEventData data) throws CloudEventRWException { + message.setPayload(data.toBytes()); + return end(); + } + + @Override + public MqttMessage end() throws CloudEventRWException { + if (userProperties.size() != 0) { + message.getProperties().setUserProperties(userProperties); + } + return message; + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + userProperties.add(new UserProperty("specversion", version.toString())); + return this; + } + + @Override + public MqttMessage setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + message.getProperties().setContentType(format.serializedContentType()); + message.setPayload(value); + return end(); + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java new file mode 100644 index 00000000..65444d3e --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java @@ -0,0 +1,57 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.message.impl.MessageUtils; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.List; + +/** + * MQTT V5 factory to : + * - Obtain a {@link MessageReader} to read CloudEvents from MQTT messages. + * - Create a {@link MessageWriter} enabling CloudEVents to be written to an MQTT message. + */ + +public final class V5MqttMessageFactory { + + /** + * Prevent instantiation. + */ + private V5MqttMessageFactory() { + + } + + /** + * Create a {@link MessageReader} to read MQTT Messages as CloudEVents + * + * @param mqttMessage An MQTT Message. + * @return {@link MessageReader} + */ + public static MessageReader createReader(MqttMessage mqttMessage) { + + final String contentType = mqttMessage.getProperties().getContentType(); + + return MessageUtils.parseStructuredOrBinaryMessage( + () -> contentType, + format -> new GenericStructuredMessageReader(format, mqttMessage.getPayload()), + () -> PahoMessageUtils.getUserProperty(mqttMessage, CloudEventV1.SPECVERSION), + sv -> new BinaryMessageReader(sv, contentType, mqttMessage) + ); + } + + /** + * Creates a {@link MessageWriter} capable of translating both a structured and binary CloudEvent + * to an MQTT {@link MqttMessage} + * + * @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to MQTT using structured or binary encoding. + */ + public static MessageWriter, MqttMessage> createWriter() { + return new V5MessageWriter<>(); + } + +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java new file mode 100644 index 00000000..8f8e594f --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java @@ -0,0 +1,11 @@ +/** + * This module implements the MQTT binding specification using the + * Paho MQTT client library. + * + * Separate factories are provided for MQTT V3 and V5. + * + * @since 2.5.0 + * + */ + +package io.cloudevents.mqtt.paho; diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java new file mode 100644 index 00000000..abb9789f --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java @@ -0,0 +1,62 @@ +package io.cloudevents.mqtt.paho; + +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +public class PahoMessageUtilsTest { + + @Test + void verifyPropertyList() { + + List props = new ArrayList<>(5); + + // Ensure Works with null List + Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "id")); + + // Ensure works with empty list. + Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "id")); + + // Create some props + props = new ArrayList<>(5); + props.add(new UserProperty("id", "aaa-bbb-ccc")); + props.add(new UserProperty("specversion", "v1.0")); + + // Ensure Presence + Assertions.assertEquals("aaa-bbb-ccc", PahoMessageUtils.getUserProperty(props, "id")); + + // Ensure Absence + Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "scoobydoo")); + + } + + @Test + void verifyMessageProperties() { + + MqttMessage msg = new MqttMessage(); + + // Verify message with no props + Assertions.assertNull(PahoMessageUtils.getUserProperty(msg, "id")); + + // Create some props + List props = null; + props = new ArrayList<>(5); + props.add(new UserProperty("id", "aaa-bbb-ccc")); + props.add(new UserProperty("specversion", "v1.0")); + + msg.setProperties(new MqttProperties()); + msg.getProperties().setUserProperties(props); + + // Ensure Presence + Assertions.assertEquals("aaa-bbb-ccc", PahoMessageUtils.getUserProperty(msg, "id")); + + // Ensure Absence + Assertions.assertNull(PahoMessageUtils.getUserProperty(msg, "scoobydoo")); + + } +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java new file mode 100644 index 00000000..9985682b --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java @@ -0,0 +1,75 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventDeserializationException; +import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.core.message.Encoding; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.test.Data; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.types.Time; + +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.beans.EventSetDescriptor; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class V3MessageFactoryTest { + + @Test + public void ensureSerializationFormat() { + + MqttMessage message = null; + + // This should fail as we don't support CSV Format + + MessageWriter writer = V3MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + // Expect an exception + + Assertions.assertThrows(CloudEventRWException.class, () -> { + writer.writeStructured(Data.V1_MIN, CSVFormat.INSTANCE); + }); + } + + + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void ensureDeserialization(CloudEvent ce) { + + + final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"; + final byte[] contentPayload = CSVFormat.INSTANCE.serialize(ce); + + // Build the MQTT Message + + MqttMessage m = new MqttMessage(); + m.setPayload(contentPayload); + + // Get a reader + MessageReader reader = V3MqttMessageFactory.createReader(m); + Assertions.assertNotNull(reader); + + // This should fail + // Expect an exception + + Assertions.assertThrows(EventDeserializationException.class, () -> { + reader.toEvent(); + }); + + } +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java new file mode 100644 index 00000000..3d13fc11 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java @@ -0,0 +1,72 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.core.test.Data; +import io.cloudevents.jackson.JsonFormat; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +/** + * Round-Trip Tests + *

+ * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V3RoundTripTests { + + + @ParameterizedTest + @MethodSource("simpleEvents") + public void roundTrip(CloudEvent ce) { + + EventFormat format = new JsonFormat(); + Assertions.assertNotNull(format); + + // Write the event out as a message. + MessageWriter writer = V3MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + MqttMessage message = (MqttMessage) writer.writeStructured(ce, format); + Assertions.assertNotNull(message); + + // Read it back and verify + + // Read the message back into an event + MessageReader reader = V3MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + + } + + /** + * This test set is limited owing to the the fact that: + * (a) We only support JSON Format + * (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event. + * @return + */ + static Stream simpleEvents() { + return Stream.of( + Data.V03_MIN, + Data.V03_WITH_TEXT_DATA, + Data.V1_MIN, + Data.V1_WITH_TEXT_DATA, + Data.V1_WITH_XML_DATA + ); + } + +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java new file mode 100644 index 00000000..a8d02acf --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java @@ -0,0 +1,146 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.message.Encoding; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.test.Data; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.types.Time; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class V5MessageFactoryTest { + + private static final String DATACONTENTTYPE_NULL = null; + private static final byte[] DATAPAYLOAD_NULL = null; + + private static Stream binaryTestArguments() { + + return Stream.of( + // V03 + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()) + ), + DATACONTENTTYPE_NULL, + DATAPAYLOAD_NULL, + Data.V03_MIN + ), + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()), + property(CloudEventV03.SUBJECT, Data.SUBJECT), + property(CloudEventV03.TIME, Time.writeTime(Data.TIME)) + ), + Data.DATACONTENTTYPE_JSON, + Data.DATA_JSON_SERIALIZED, + Data.V03_WITH_JSON_DATA + ) + ); + } + + private static UserProperty property(String key, String val) { + return new UserProperty(key, val); + } + + private static List properties(final UserProperty... props) { + return Stream.of(props).collect(Collectors.toList()); + } + + @Test + public void testWriteBinary() { + + final MqttMessage message = V5MqttMessageFactory.createWriter().writeBinary(Data.V1_MIN); + Assertions.assertNotNull(message); + } + + // Test Data + + @Test + public void testWriteStructured() { + final MqttMessage message = V5MqttMessageFactory.createWriter().writeStructured(Data.V1_MIN, CSVFormat.INSTANCE); + Assertions.assertNotNull(message); + } + + @ParameterizedTest() + @MethodSource("binaryTestArguments") + public void testReadBinary(List userProps, String contentType, byte[] data, CloudEvent ce) { + MqttMessage msg = new MqttMessage(); + + // Populate Properties + MqttProperties props = new MqttProperties(); + props.setUserProperties(userProps); + msg.setProperties(props); + + // Populate payload & contentType + if (data != null) { + msg.setPayload(data); + } + + if (contentType != null) { + msg.getProperties().setContentType(contentType); + } + + MessageReader reader = V5MqttMessageFactory.createReader(msg); + + Assertions.assertNotNull(reader); + assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY); + + CloudEvent newCe = reader.toEvent(); + + assertThat(newCe).isEqualTo(ce); + + } + + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void testReadStructured(CloudEvent ce) { + + + final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"; + final byte[] contentPayload = CSVFormat.INSTANCE.serialize(ce); + + // Build the MQTT Message + + MqttMessage m = new MqttMessage(); + + MqttProperties props = new MqttProperties(); + props.setContentType(contentType); + m.setProperties(props); + m.setPayload(contentPayload); + + // Get a reader + MessageReader reader = V5MqttMessageFactory.createReader(m); + Assertions.assertNotNull(reader); + assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED); + + // Re-Hydrate the CloudEvent + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And hopefully they match + assertThat(newCE).isEqualTo(ce); + + } +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java new file mode 100644 index 00000000..c50fb184 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java @@ -0,0 +1,70 @@ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Round-Trip Tests + *

+ * For both Binary and Structured modes: + * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V5RoundTripTests { + + private static void readAndVerify(CloudEvent ce, MqttMessage message) { + + Assertions.assertNotNull(message); + + // Read the message back into an event + MessageReader reader = V5MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripBinary(CloudEvent ce) { + + // Write the event out as a message. + MessageWriter writer = V5MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + MqttMessage message = (MqttMessage) writer.writeBinary(ce); + + // Read it back and verify + readAndVerify(ce, message); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripStructured(CloudEvent ce) { + + EventFormat format = CSVFormat.INSTANCE; + + // Write the event out as a message. + MessageWriter writer = V5MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + MqttMessage message = (MqttMessage) writer.writeStructured(ce, format); + + // Read it back and verify + readAndVerify(ce, message); + + } + + +} diff --git a/docs/index.md b/docs/index.md index d8ec2ebe..ef174600 100644 --- a/docs/index.md +++ b/docs/index.md @@ -27,23 +27,25 @@ Using the Java SDK you can: ## Supported features | | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) | -| :------------------------------------------------: | :---------------------------------------------------: | :---------------------------------------------------: | +|:--------------------------------------------------:|:-----------------------------------------------------:|:-----------------------------------------------------:| | CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: | | AMQP Protocol Binding | :x: | :x: | | - [Proton](amqp-proton.md) | :heavy_check_mark: | :heavy_check_mark: | + | MQTT Protocol Binding | :x: | :x: | + | - [Paho](mqtt-paho.md) | :heavy_check_mark: | :heavy_check_mark: | | AVRO Event Format | :x: | :x: | | HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: | | - [Vert.x](http-vertx.md) | :heavy_check_mark: | :heavy_check_mark: | | - [Jakarta Restful WS](http-jakarta-restful-ws.md) | :heavy_check_mark: | :heavy_check_mark: | | - [Basic](http-basic.md) | :heavy_check_mark: | :heavy_check_mark: | | - [Spring](spring.md) | :heavy_check_mark: | :heavy_check_mark: | -| - [http4k][http4k] | :heavy_check_mark: | :heavy_check_mark: | +| - [http4k][http4k] | :heavy_check_mark: | :heavy_check_mark: | | JSON Event Format | :heavy_check_mark: | :heavy_check_mark: | | - [Jackson](json-jackson.md) | :heavy_check_mark: | :heavy_check_mark: | -| Protobuf Event Format | :heavy_check_mark: | :heavy_check_mark: | -| - [Proto](protobuf.md) | :heavy_check_mark: | :heavy_check_mark: | -| XML Event Format | :heavy_check_mark: | :heavy_check_mark: | -| - [XML](xml.md) | :heavy_check_mark: | :heavy_check_mark: | +| Protobuf Event Format | :heavy_check_mark: | :heavy_check_mark: | +| - [Proto](protobuf.md) | :heavy_check_mark: | :heavy_check_mark: | +| XML Event Format | :heavy_check_mark: | :heavy_check_mark: | +| - [XML](xml.md) | :heavy_check_mark: | :heavy_check_mark: | | [Kafka Protocol Binding](kafka.md) | :heavy_check_mark: | :heavy_check_mark: | | MQTT Protocol Binding | :x: | :x: | | NATS Protocol Binding | :x: | :x: | diff --git a/docs/mqtt-paho.md b/docs/mqtt-paho.md new file mode 100644 index 00000000..b529196d --- /dev/null +++ b/docs/mqtt-paho.md @@ -0,0 +1,41 @@ +--- +title: CloudEvents MQTT (Paho) +nav_order: 5 +--- + +# CloudEvents MQTT Paho + +This module implements `MessageReader` and `MessageWriter` using the Paho +library. + +It currently only support MQTT Version 5. + +For Maven based projects, use the following to include the `Paho` MQTT Binding + +```xml + + io.cloudevents + cloudevents-mqtt-paho + 2.5.0 + +``` + +# Sending & Receiving CloudEvents + +The `PahoMessageFactory` provides methods to create CloudEvent `MessageReader` and `MessageWriter` +instances that are used to encode and decode CloudEvents into MQTT messages. + +```java +public final class PahoMessageFactory { + + public static MessageReader createReader(MqttMessage mqttMessage); + + public static MessageWriter createWriter(); + +} +``` + +## Reading CloudEvents + +## Sending CloudEvents + diff --git a/pom.xml b/pom.xml index b0f70600..9bfdd34a 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,9 @@ formats/json-jackson formats/protobuf formats/xml + bindings/mqtt/core + bindings/mqtt/paho + bindings/mqtt/hivemq amqp http/basic http/vertx From c949073f047b5c826a9e8f4d4932e7a5e26aaa81 Mon Sep 17 00:00:00 2001 From: Jem Day Date: Mon, 13 Mar 2023 12:07:41 -0700 Subject: [PATCH 2/2] Cleanup - SStill W.I.P Signed-off-by: Jem Day --- .../core/BaseMqttBinaryMessageReader.java | 22 ++++++- .../io/cloudevents/mqtt/core/MqttUtils.java | 24 +++++++- .../mqtt/hivemq/BinaryMessageReader.java | 16 +++++ .../mqtt/hivemq/MqttMessageFactory.java | 20 ++++++- .../mqtt/hivemq/V3MessageWriter.java | 24 ++++++-- .../mqtt/hivemq/V5MessageWriter.java | 16 +++++ .../cloudevents/mqtt/hivemq/package-info.java | 5 +- .../mqtt/hivemq/MqttMessageFactoryTest.java | 16 +++++ .../mqtt/hivemq/V3MessageWriterTest.java | 16 +++++ .../mqtt/hivemq/V3RoundTripTests.java | 16 +++++ .../mqtt/hivemq/V5MessageWriterTest.java | 16 +++++ .../mqtt/hivemq/V5RoundTripTests.java | 16 +++++ .../mqtt/paho/BinaryMessageReader.java | 16 +++++ .../mqtt/paho/PahoMessageUtils.java | 16 +++++ .../mqtt/paho/V3MessageWriter.java | 22 ++++++- .../mqtt/paho/V3MqttMessageFactory.java | 18 +++++- .../mqtt/paho/V5MessageWriter.java | 16 +++++ .../mqtt/paho/V5MqttMessageFactory.java | 19 +++++- .../cloudevents/mqtt/paho/package-info.java | 3 +- .../mqtt/paho/PahoMessageUtilsTest.java | 16 +++++ .../mqtt/paho/V3MessageFactoryTest.java | 32 +++++----- .../mqtt/paho/V3RoundTripTests.java | 51 ++++++++++------ .../mqtt/paho/V5MessageFactoryTest.java | 16 +++++ .../mqtt/paho/V5RoundTripTests.java | 16 +++++ docs/index.md | 3 +- docs/mqtt-paho.md | 41 ------------- docs/mqtt.md | 60 +++++++++++++++++++ 27 files changed, 453 insertions(+), 99 deletions(-) delete mode 100644 docs/mqtt-paho.md create mode 100644 docs/mqtt.md diff --git a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java index a0aadea0..86f3c590 100644 --- a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java +++ b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.core; import io.cloudevents.SpecVersion; @@ -25,9 +41,10 @@ public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessa /** * Initialise the binary message reader. - * @param version The CloudEvent message version. + * + * @param version The CloudEvent message version. * @param contentType The assigned media content type. - * @param payload The raw data payload from the MQTT message. + * @param payload The raw data payload from the MQTT message. */ protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) { super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null); @@ -82,6 +99,7 @@ public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessa /** * Visit each MQTT user-property and invoke the supplied function. + * * @param fn The function to invoke for each MQTT User property. */ protected abstract void forEachUserProperty(BiConsumer fn); diff --git a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java index 9efb2285..5953c0c4 100644 --- a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java +++ b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.core; import io.cloudevents.core.format.EventFormat; @@ -8,9 +24,10 @@ import io.cloudevents.core.provider.EventFormatProvider; */ public class MqttUtils { - private MqttUtils() {} + private static final String DEFAULT_FORMAT = "application/cloudevents+json"; - private static final String DEFAULT_FORMAT = "application/cloudevents+json"; + private MqttUtils() { + } /** * Obtain the {@link EventFormat} to use when working with MQTT V3 @@ -18,7 +35,7 @@ public class MqttUtils { * * @return An event format. */ - public static EventFormat getDefaultEventFormat () { + public static EventFormat getDefaultEventFormat() { return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT); @@ -26,6 +43,7 @@ public class MqttUtils { /** * Get the default content type to assume for MQTT messages. + * * @return A Content-Type */ public static final String getDefaultContentType() { diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java index ac5d16c6..4b689dc0 100644 --- a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java index 3016c277..7a850d26 100644 --- a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.datatypes.MqttUtf8String; @@ -20,7 +36,6 @@ import java.util.Optional; * A factory to obtain: * - {@link MessageReader} instances to read CloudEvents from MQTT messages. * - {@link MessageWriter} instances to write CloudEvents into MQTT messages. - * */ public class MqttMessageFactory { @@ -74,7 +89,7 @@ public class MqttMessageFactory { /** * Create a {@link MessageWriter} for an MQTT V3 Message. - * + *

* Only supports structured messages. * * @param builder {@link Mqtt3PublishBuilder.Complete} @@ -89,6 +104,7 @@ public class MqttMessageFactory { /** * Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties. + * * @param message An MQTT message. * @return spec version attribute content. */ diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java index 9cbafdee..62a5ecf4 100644 --- a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; @@ -20,21 +36,21 @@ class V3MessageWriter implements MessageWriter create(SpecVersion version) throws CloudEventRWException { // No-Op - return null; + throw CloudEventRWException.newOther("Internal Error"); } @Override public Mqtt3PublishBuilder setEvent(EventFormat format, byte[] value) throws CloudEventRWException { // No-Op - return null; + throw CloudEventRWException.newOther("Internal Error"); } @Override public Mqtt3PublishBuilder writeStructured(CloudEvent event, String format) { - EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format); + final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format); if (eventFormat != null) { - return writeStructured(event, EventFormatProvider.getInstance().resolveFormat(format)); + return writeStructured(event, eventFormat); } else { throw CloudEventRWException.newOther("Unsupported Format: " + format); } diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java index 03fb2d81..0bb70369 100644 --- a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java index 2bbf2ebc..f3042254 100644 --- a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java @@ -1,14 +1,13 @@ /** * This module implements the MQTT binding specification using the * HiveMQ MQTT client library. - * + *

* Use the {@link io.cloudevents.mqtt.hivemq.MqttMessageFactory} to obtain * CloudEvent reader and writer instances. - * + *

* Both V3 and V5 versions of MQTT are supported. * * @since 2.5.0 - * */ package io.cloudevents.mqtt.hivemq; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java index 583248db..377a24fe 100644 --- a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java index 800589f1..79897805 100644 --- a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java index 828a56f4..0227d473 100644 --- a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java index 791b1246..0e9cdc49 100644 --- a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java index 5dd9d170..dd78177f 100644 --- a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.hivemq; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java index e333a5af..fcbf631b 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.SpecVersion; diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java index 1710f7cd..098bab2d 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import org.eclipse.paho.mqttv5.common.MqttMessage; diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java index 18162c68..844e7ca6 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java @@ -1,9 +1,24 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.CloudEvent; import io.cloudevents.SpecVersion; import io.cloudevents.core.format.EventFormat; -import io.cloudevents.core.format.EventSerializationException; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.mqtt.core.MqttUtils; @@ -13,7 +28,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; /** * A {@link MessageWriter} that writes an CloudEvent to a V3 MQTT Message. - * + *

* Note: This only supports Structured messages in JSON format as defined * by the MQTT CloudEvent binding specification. */ @@ -28,8 +43,9 @@ class V3MessageWriter implements MessageWriter, Mq /** * Ensure the supplied content type is appropriate for V3 messages * as-per binding specification. - * + *

* Raises exception if not valid. + * * @param contentType */ private void ensureValidContent(String contentType) { diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java index b539cf5f..b4474b95 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java @@ -1,10 +1,24 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; -import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; -import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.mqtt.core.MqttUtils; import io.cloudevents.rw.CloudEventWriter; import org.eclipse.paho.client.mqttv3.MqttMessage; diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java index e04b9f9c..ec10170b 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.CloudEventData; diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java index 65444d3e..20d7141f 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.core.message.MessageReader; @@ -7,9 +23,6 @@ import io.cloudevents.core.message.impl.MessageUtils; import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.rw.CloudEventWriter; import org.eclipse.paho.mqttv5.common.MqttMessage; -import org.eclipse.paho.mqttv5.common.packet.UserProperty; - -import java.util.List; /** * MQTT V5 factory to : diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java index 8f8e594f..13cfd876 100644 --- a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java @@ -1,11 +1,10 @@ /** * This module implements the MQTT binding specification using the * Paho MQTT client library. - * + *

* Separate factories are provided for MQTT V3 and V5. * * @since 2.5.0 - * */ package io.cloudevents.mqtt.paho; diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java index abb9789f..a7661c30 100644 --- a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import org.eclipse.paho.mqttv5.common.MqttMessage; diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java index 9985682b..885296dd 100644 --- a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java @@ -1,32 +1,34 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.CloudEvent; -import io.cloudevents.SpecVersion; import io.cloudevents.core.format.EventDeserializationException; -import io.cloudevents.core.format.EventSerializationException; -import io.cloudevents.core.message.Encoding; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.mock.CSVFormat; import io.cloudevents.core.test.Data; -import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.types.Time; - import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.beans.EventSetDescriptor; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - public class V3MessageFactoryTest { @Test @@ -64,7 +66,7 @@ public class V3MessageFactoryTest { MessageReader reader = V3MqttMessageFactory.createReader(m); Assertions.assertNotNull(reader); - // This should fail + // This should fail // Expect an exception Assertions.assertThrows(EventDeserializationException.class, () -> { diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java index 3d13fc11..6be040f6 100644 --- a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java @@ -1,11 +1,25 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.CloudEvent; import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; -import io.cloudevents.core.mock.CSVFormat; -import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.core.test.Data; import io.cloudevents.jackson.JsonFormat; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -25,6 +39,23 @@ import java.util.stream.Stream; public class V3RoundTripTests { + /** + * This test set is limited owing to the the fact that: + * (a) We only support JSON Format + * (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event. + * + * @return + */ + static Stream simpleEvents() { + return Stream.of( + Data.V03_MIN, + Data.V03_WITH_TEXT_DATA, + Data.V1_MIN, + Data.V1_WITH_TEXT_DATA, + Data.V1_WITH_XML_DATA + ); + } + @ParameterizedTest @MethodSource("simpleEvents") public void roundTrip(CloudEvent ce) { @@ -53,20 +84,4 @@ public class V3RoundTripTests { } - /** - * This test set is limited owing to the the fact that: - * (a) We only support JSON Format - * (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event. - * @return - */ - static Stream simpleEvents() { - return Stream.of( - Data.V03_MIN, - Data.V03_WITH_TEXT_DATA, - Data.V1_MIN, - Data.V1_WITH_TEXT_DATA, - Data.V1_WITH_XML_DATA - ); - } - } diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java index a8d02acf..dc3402c0 100644 --- a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.CloudEvent; diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java index c50fb184..f692283a 100644 --- a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package io.cloudevents.mqtt.paho; import io.cloudevents.CloudEvent; diff --git a/docs/index.md b/docs/index.md index ef174600..8078c4e3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -32,7 +32,8 @@ Using the Java SDK you can: | AMQP Protocol Binding | :x: | :x: | | - [Proton](amqp-proton.md) | :heavy_check_mark: | :heavy_check_mark: | | MQTT Protocol Binding | :x: | :x: | - | - [Paho](mqtt-paho.md) | :heavy_check_mark: | :heavy_check_mark: | + | - [Paho](mqtt.md) | :heavy_check_mark: | :heavy_check_mark: | + | - [HiveMQ](mqtt.md) | :heavy_check_mark: | :heavy_check_mark: | | AVRO Event Format | :x: | :x: | | HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: | | - [Vert.x](http-vertx.md) | :heavy_check_mark: | :heavy_check_mark: | diff --git a/docs/mqtt-paho.md b/docs/mqtt-paho.md deleted file mode 100644 index b529196d..00000000 --- a/docs/mqtt-paho.md +++ /dev/null @@ -1,41 +0,0 @@ ---- -title: CloudEvents MQTT (Paho) -nav_order: 5 ---- - -# CloudEvents MQTT Paho - -This module implements `MessageReader` and `MessageWriter` using the Paho -library. - -It currently only support MQTT Version 5. - -For Maven based projects, use the following to include the `Paho` MQTT Binding - -```xml - - io.cloudevents - cloudevents-mqtt-paho - 2.5.0 - -``` - -# Sending & Receiving CloudEvents - -The `PahoMessageFactory` provides methods to create CloudEvent `MessageReader` and `MessageWriter` -instances that are used to encode and decode CloudEvents into MQTT messages. - -```java -public final class PahoMessageFactory { - - public static MessageReader createReader(MqttMessage mqttMessage); - - public static MessageWriter createWriter(); - -} -``` - -## Reading CloudEvents - -## Sending CloudEvents - diff --git a/docs/mqtt.md b/docs/mqtt.md new file mode 100644 index 00000000..4d1aacee --- /dev/null +++ b/docs/mqtt.md @@ -0,0 +1,60 @@ +--- +title: CloudEvents MQTT +nav_order: 5 +--- + +# MQTT Support + +The SDK supports both V3 and V5 MQTT binding specifications via these Java client libraries: + + * [Paho]() + * [HiveMQ]() + +NOTE: MQTT V3 *only* supports structured mode transfer of CloudEVents. Operations related to binary mode transmission +are either not available or will throw runtime exceptions if an attempt is made to use them. + +Both client library implementations rely on a *provided* maven dependency. + +# General Usage + +There is a slight variance in usage between the two supported client libraries owing to the way those clients +have implemented support for the two versions of MQTT but the general pattern is the same as every other protocol +binding. + +## Creating a message from a CloudEvent + + 1. Obtain a `MessageWriter` from a factory. + 2. Use the writer to populate the MQTT message using structured or binary mode. + * `mqttMessage = messageWriter.writeBinary(cloudEvent);` or, + * `mqttMessage = messageWriter.writeStructured(cloudEvent, eventFormat);` + +## Creating a CloudEvent from a message. + + 1. Obtain a 'MessageReader' from a message factory for an MQTT message. + 2. Obtain a CloudEvent from the reader. + * _CloudEvent cloudEvent = reader.toEvent();_ + + +# PAHO Client Usage + +## Maven + +```xml + + io.cloudevents + cloudevents-mqtt-paho + 2.x.y + +``` + +# HiveMQ Client Usage + +## Maven + +```xml + + io.cloudevents + cloudevents-mqtt-hivemq + 2.x.y + +```