From 42a732623b1eaed72286012d0cc0dde11589aef7 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Fri, 13 Nov 2020 09:54:07 +0100 Subject: [PATCH] Improvements to CloudEventReader (#263) * Extracted readAttributes and readExtensions from CloudEventReader Added CloudEventUtils#toContextReader to create a context reader starting from a CloudEvent Improved documentation of *Reader interfaces Renamed MessageReader#visit to the proper name MessageReader#read Signed-off-by: Francesco Guardiani * Typo Signed-off-by: Francesco Guardiani --- .../rw/CloudEventContextReader.java | 46 +++++++++++++++++++ .../io/cloudevents/rw/CloudEventReader.java | 20 ++------ .../cloudevents/core/impl/BaseCloudEvent.java | 2 +- .../core/impl/CloudEventReaderAdapter.java | 4 +- .../core/impl/CloudEventUtils.java | 22 ++++++++- .../core/message/MessageReader.java | 24 ++-------- .../BaseGenericBinaryMessageReaderImpl.java | 40 +++------------- .../impl/BaseStructuredMessageReader.java | 14 ++---- .../impl/UnknownEncodingMessageReader.java | 15 ++---- .../core/v03/CloudEventBuilder.java | 4 +- .../core/v1/CloudEventBuilder.java | 4 +- .../core/mock/MockBinaryMessageWriter.java | 2 +- .../jackson/CloudEventDeserializer.java | 10 ---- .../jackson/CloudEventSerializer.java | 4 +- .../cloudevents/http/HttpMessageFactory.java | 2 +- .../http/HttpMessageReaderWriterTest.java | 4 +- .../kafka/CloudEventMessageSerializer.java | 2 +- 17 files changed, 100 insertions(+), 119 deletions(-) create mode 100644 api/src/main/java/io/cloudevents/rw/CloudEventContextReader.java diff --git a/api/src/main/java/io/cloudevents/rw/CloudEventContextReader.java b/api/src/main/java/io/cloudevents/rw/CloudEventContextReader.java new file mode 100644 index 00000000..4fc0cfff --- /dev/null +++ b/api/src/main/java/io/cloudevents/rw/CloudEventContextReader.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rw; + +import javax.annotation.ParametersAreNonnullByDefault; + +/** + * Represents an object that can be read as CloudEvent context attributes and extensions. + *

+ * An object (in particular, buffered objects) can implement both this interface and {@link CloudEventReader}. + */ +@ParametersAreNonnullByDefault +public interface CloudEventContextReader { + + /** + * Visit self attributes using the provided writer + * + * @param writer Attributes writer + * @throws CloudEventRWException if something went wrong during the visit. + */ + void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException; + + /** + * Visit self extensions using the provided writer + * + * @param visitor Extensions writer + * @throws CloudEventRWException if something went wrong during the visit. + */ + void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException; + +} diff --git a/api/src/main/java/io/cloudevents/rw/CloudEventReader.java b/api/src/main/java/io/cloudevents/rw/CloudEventReader.java index da4c9b78..69e23197 100644 --- a/api/src/main/java/io/cloudevents/rw/CloudEventReader.java +++ b/api/src/main/java/io/cloudevents/rw/CloudEventReader.java @@ -22,7 +22,9 @@ import io.cloudevents.lang.Nullable; import javax.annotation.ParametersAreNonnullByDefault; /** - * Represents an object that can be read as CloudEvent + * Represents an object that can be read as CloudEvent. + *

+ * The read may consume this object, hence it's not safe to invoke it multiple times, unless it's explicitly allowed by the implementer. */ @ParametersAreNonnullByDefault public interface CloudEventReader { @@ -42,20 +44,4 @@ public interface CloudEventReader { */ , R> R read(CloudEventWriterFactory writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException; - /** - * Visit self attributes using the provided writer - * - * @param writer Attributes writer - * @throws CloudEventRWException if something went wrong during the visit. - */ - void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException; - - /** - * Visit self extensions using the provided writer - * - * @param visitor Extensions writer - * @throws CloudEventRWException if something went wrong during the visit. - */ - void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException; - } diff --git a/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java b/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java index 90539b85..88212fdb 100644 --- a/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java +++ b/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader { +public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader, CloudEventContextReader { private final CloudEventData data; protected final Map extensions; diff --git a/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java b/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java index 7ecf55d8..4d598240 100644 --- a/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java +++ b/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java @@ -20,9 +20,9 @@ package io.cloudevents.core.impl; import io.cloudevents.CloudEvent; import io.cloudevents.rw.*; -public class CloudEventReaderAdapter implements CloudEventReader { +public class CloudEventReaderAdapter implements CloudEventReader, CloudEventContextReader { - private CloudEvent event; + private final CloudEvent event; CloudEventReaderAdapter(CloudEvent event) { this.event = event; diff --git a/core/src/main/java/io/cloudevents/core/impl/CloudEventUtils.java b/core/src/main/java/io/cloudevents/core/impl/CloudEventUtils.java index 98109708..fe5df47f 100644 --- a/core/src/main/java/io/cloudevents/core/impl/CloudEventUtils.java +++ b/core/src/main/java/io/cloudevents/core/impl/CloudEventUtils.java @@ -20,6 +20,7 @@ package io.cloudevents.core.impl; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.lang.Nullable; +import io.cloudevents.rw.CloudEventContextReader; import io.cloudevents.rw.CloudEventDataMapper; import io.cloudevents.rw.CloudEventReader; @@ -29,7 +30,9 @@ public final class CloudEventUtils { /** * Convert a {@link CloudEvent} to a {@link CloudEventReader}. This method provides a default implementation - * for CloudEvent that doesn't implement CloudEventVisitable + * for CloudEvent that doesn't implement CloudEventVisitable. + *

+ * It's safe to use the returned {@link CloudEventReader} multiple times. * * @param event the event to convert * @return the visitable implementation @@ -42,6 +45,23 @@ public final class CloudEventUtils { } } + /** + * Convert a {@link CloudEvent} to a {@link CloudEventContextReader}. This method provides a default implementation + * for {@link CloudEvent} that doesn't implement {@link CloudEventContextReader}. + *

+ * It's safe to use the returned {@link CloudEventReader} multiple times. + * + * @param event the event to convert + * @return the context reader implementation + */ + public static CloudEventContextReader toContextReader(CloudEvent event) { + if (event instanceof CloudEventContextReader) { + return (CloudEventContextReader) event; + } else { + return new CloudEventReaderAdapter(event); + } + } + /** * Get the data contained in {@code event} and map it using the provided mapper. */ diff --git a/core/src/main/java/io/cloudevents/core/message/MessageReader.java b/core/src/main/java/io/cloudevents/core/message/MessageReader.java index b2c0f978..ac01c1f8 100644 --- a/core/src/main/java/io/cloudevents/core/message/MessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/MessageReader.java @@ -47,24 +47,6 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader */ , R> R read(CloudEventWriterFactory writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException; - /** - * Visit the message attributes as binary encoded event using the provided visitor. - * - * @param writer Attributes visitor - * @throws CloudEventRWException if something went wrong during the visit. - * @throws IllegalStateException if the message is not in binary encoding. - */ - void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException, IllegalStateException; - - /** - * Visit the message extensions as binary encoded event using the provided visitor. - * - * @param visitor Extensions visitor - * @throws CloudEventRWException if something went wrong during the visit. - * @throws IllegalStateException if the message is not in binary encoding. - */ - void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException, IllegalStateException; - /** * Visit the message as structured encoded event using the provided visitor * @@ -80,15 +62,15 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader Encoding getEncoding(); /** - * Visit the event using a {@link MessageWriter}. This method allows to transcode an event from one transport to another without + * Read the content of this object using a {@link MessageWriter}. This method allows to transcode an event from one transport to another without * converting it to {@link CloudEvent}. The resulting encoding will be the same as the original encoding. * * @param visitor the MessageVisitor accepting this Message * @return The return value of the MessageVisitor * @throws CloudEventRWException if something went wrong during the visit. - * @throws IllegalStateException if the message has an unknown encoding. + * @throws IllegalStateException if the message has an unknown encoding. */ - default , R> R visit(MessageWriter visitor) throws CloudEventRWException, IllegalStateException { + default , R> R read(MessageWriter visitor) throws CloudEventRWException, IllegalStateException { switch (getEncoding()) { case BINARY: return this.read((CloudEventWriterFactory) visitor); diff --git a/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java b/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java index 9033cf5e..2d564659 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java @@ -19,14 +19,18 @@ package io.cloudevents.core.message.impl; import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; -import io.cloudevents.rw.*; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import io.cloudevents.rw.CloudEventWriterFactory; import java.util.Objects; import java.util.function.BiConsumer; /** - * This class implements a Binary {@link io.cloudevents.core.message.MessageReader}, providing common logic to most protocol bindings - * which supports both Binary and Structured mode. + * This class implements a Binary {@link io.cloudevents.core.message.MessageReader}, + * providing common logic to most protocol bindings which supports both Binary and Structured mode. + *

* Content-type is handled separately using a key not prefixed with CloudEvents header prefix. * * @param Header key type @@ -74,36 +78,6 @@ public abstract class BaseGenericBinaryMessageReaderImpl extends BaseBin return visitor.end(); } - @Override - public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException { - this.forEachHeader((key, value) -> { - if (isContentTypeHeader(key)) { - writer.withAttribute("datacontenttype", toCloudEventsValue(value)); - } else if (isCloudEventsHeader(key)) { - String name = toCloudEventsKey(key); - if (name.equals("specversion")) { - return; - } - if (this.version.getAllAttributes().contains(name)) { - writer.withAttribute(name, toCloudEventsValue(value)); - } - } - }); - } - - @Override - public void readExtensions(CloudEventExtensionsWriter visitor) throws RuntimeException { - // Grab from headers the attributes and extensions - this.forEachHeader((key, value) -> { - if (isCloudEventsHeader(key)) { - String name = toCloudEventsKey(key); - if (!this.version.getAllAttributes().contains(name)) { - visitor.withExtension(name, toCloudEventsValue(value)); - } - } - }); - } - protected abstract boolean isContentTypeHeader(HK key); protected abstract boolean isCloudEventsHeader(HK key); diff --git a/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java b/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java index 04a031ec..4639d50e 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java @@ -19,7 +19,9 @@ package io.cloudevents.core.message.impl; import io.cloudevents.core.message.Encoding; import io.cloudevents.core.message.MessageReader; -import io.cloudevents.rw.*; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.rw.CloudEventWriter; +import io.cloudevents.rw.CloudEventWriterFactory; public abstract class BaseStructuredMessageReader implements MessageReader { @@ -32,14 +34,4 @@ public abstract class BaseStructuredMessageReader implements MessageReader { public , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) { throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED); } - - @Override - public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException { - throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED); - } - - @Override - public void readExtensions(CloudEventExtensionsWriter visitor) throws RuntimeException { - throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED); - } } diff --git a/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java b/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java index ac6d4f71..94c78d3a 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java @@ -20,7 +20,10 @@ package io.cloudevents.core.message.impl; import io.cloudevents.core.message.Encoding; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.StructuredMessageWriter; -import io.cloudevents.rw.*; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import io.cloudevents.rw.CloudEventWriterFactory; public class UnknownEncodingMessageReader implements MessageReader { @Override @@ -33,16 +36,6 @@ public class UnknownEncodingMessageReader implements MessageReader { throw new IllegalStateException("Unknown encoding"); } - @Override - public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException { - throw new IllegalStateException("Unknown encoding"); - } - - @Override - public void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException { - throw new IllegalStateException("Unknown encoding"); - } - @Override public T read(StructuredMessageWriter visitor) throws CloudEventRWException, IllegalStateException { throw new IllegalStateException("Unknown encoding"); diff --git a/core/src/main/java/io/cloudevents/core/v03/CloudEventBuilder.java b/core/src/main/java/io/cloudevents/core/v03/CloudEventBuilder.java index 3ba1ec33..deabdbc6 100644 --- a/core/src/main/java/io/cloudevents/core/v03/CloudEventBuilder.java +++ b/core/src/main/java/io/cloudevents/core/v03/CloudEventBuilder.java @@ -54,9 +54,9 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder, CloudEventWriter { +public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventContextReader, CloudEventWriterFactory, CloudEventWriter { private SpecVersion version; private Map attributes; diff --git a/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java b/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java index dbac1981..62aa8353 100644 --- a/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java +++ b/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java @@ -154,16 +154,6 @@ public class CloudEventDeserializer extends StdDeserializer { } } - @Override - public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException { - // no-op no need for that - } - - @Override - public void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException { - // no-op no need for that - } - private String getStringNode(ObjectNode objNode, JsonParser p, String attributeName) throws JsonProcessingException { String val = getOptionalStringNode(objNode, p, attributeName); if (val == null) { diff --git a/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventSerializer.java b/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventSerializer.java index 3d0f9150..1892fa19 100644 --- a/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventSerializer.java +++ b/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventSerializer.java @@ -24,9 +24,9 @@ import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.core.impl.CloudEventUtils; import io.cloudevents.rw.CloudEventAttributesWriter; +import io.cloudevents.rw.CloudEventContextReader; import io.cloudevents.rw.CloudEventExtensionsWriter; import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.rw.CloudEventReader; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -104,7 +104,7 @@ public class CloudEventSerializer extends StdSerializer { // Serialize attributes try { - CloudEventReader visitable = CloudEventUtils.toVisitable(value); + CloudEventContextReader visitable = CloudEventUtils.toContextReader(value); FieldsSerializer serializer = new FieldsSerializer(gen, provider); visitable.readAttributes(serializer); visitable.readExtensions(serializer); diff --git a/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java b/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java index d6d02f5e..1ffe381a 100644 --- a/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java +++ b/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java @@ -145,7 +145,7 @@ public final class HttpMessageFactory { * @param sendBody a function that sends body (e.g. sets HTTP status code, content-length and writes the bytes into output stream). * @return a message writer */ - public static MessageWriter createWriter(BiConsumer putHeader, Consumer sendBody) { + public static HttpMessageWriter createWriter(BiConsumer putHeader, Consumer sendBody) { return new HttpMessageWriter(putHeader, sendBody); } diff --git a/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java b/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java index edfa33d0..4148783c 100644 --- a/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java +++ b/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java @@ -22,7 +22,6 @@ import io.cloudevents.core.message.Encoding; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; import io.cloudevents.core.mock.CSVFormat; -import io.cloudevents.http.HttpMessageFactory; import io.cloudevents.http.impl.HttpMessageWriter; import io.cloudevents.types.Time; import org.junit.jupiter.params.ParameterizedTest; @@ -46,14 +45,13 @@ public class HttpMessageReaderWriterTest { final AtomicReference body = new AtomicReference<>(); final Map headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - GenericStructuredMessageReader.from(event, CSVFormat.INSTANCE).visit( + GenericStructuredMessageReader.from(event, CSVFormat.INSTANCE).read( HttpMessageFactory.createWriter(headers::put, body::set) ); assertThat(headers.get("content-type")) .isEqualTo(CSVFormat.INSTANCE.serializedContentType()); assertThat(body.get()) .isEqualTo(CSVFormat.INSTANCE.serialize(event)); - } @ParameterizedTest diff --git a/kafka/src/main/java/io/cloudevents/kafka/CloudEventMessageSerializer.java b/kafka/src/main/java/io/cloudevents/kafka/CloudEventMessageSerializer.java index 6569cb99..e9d21005 100644 --- a/kafka/src/main/java/io/cloudevents/kafka/CloudEventMessageSerializer.java +++ b/kafka/src/main/java/io/cloudevents/kafka/CloudEventMessageSerializer.java @@ -43,6 +43,6 @@ public class CloudEventMessageSerializer implements Serializer { @Override public byte[] serialize(String topic, Headers headers, MessageReader data) { - return data.visit(new KafkaSerializerMessageWriterImpl(headers)); + return data.read(new KafkaSerializerMessageWriterImpl(headers)); } }