diff --git a/api/src/main/java/io/cloudevents/rw/CloudEventDataMapper.java b/api/src/main/java/io/cloudevents/rw/CloudEventDataMapper.java index e97b323b..f427f76e 100644 --- a/api/src/main/java/io/cloudevents/rw/CloudEventDataMapper.java +++ b/api/src/main/java/io/cloudevents/rw/CloudEventDataMapper.java @@ -37,4 +37,10 @@ public interface CloudEventDataMapper { */ R map(CloudEventData data) throws CloudEventRWException; + /** + * No-op identity mapper which can be used as default when no mapper is provided. + */ + static CloudEventDataMapper identity() { + return d -> d; + } } diff --git a/api/src/main/java/io/cloudevents/rw/CloudEventReader.java b/api/src/main/java/io/cloudevents/rw/CloudEventReader.java index 3ab47863..53d22916 100644 --- a/api/src/main/java/io/cloudevents/rw/CloudEventReader.java +++ b/api/src/main/java/io/cloudevents/rw/CloudEventReader.java @@ -17,7 +17,7 @@ package io.cloudevents.rw; -import io.cloudevents.lang.Nullable; +import io.cloudevents.CloudEventData; import javax.annotation.ParametersAreNonnullByDefault; @@ -36,12 +36,12 @@ public interface CloudEventReader { * @throws CloudEventRWException if something went wrong during the read. */ default , R> R read(CloudEventWriterFactory writerFactory) throws CloudEventRWException { - return read(writerFactory, null); + return read(writerFactory, CloudEventDataMapper.identity()); } /** * Like {@link CloudEventReader#read(CloudEventWriterFactory)}, but providing a mapper for {@link io.cloudevents.CloudEventData} to be invoked when the data field is available. */ - , R> R read(CloudEventWriterFactory writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException; + , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException; } diff --git a/core/src/main/java/io/cloudevents/core/CloudEventUtils.java b/core/src/main/java/io/cloudevents/core/CloudEventUtils.java index 578ec766..ebd23c5b 100644 --- a/core/src/main/java/io/cloudevents/core/CloudEventUtils.java +++ b/core/src/main/java/io/cloudevents/core/CloudEventUtils.java @@ -76,7 +76,7 @@ public final class CloudEventUtils { * @return the reader implementation */ public static CloudEvent toEvent(CloudEventReader reader) throws CloudEventRWException { - return toEvent(reader, null); + return toEvent(reader, CloudEventDataMapper.identity()); } /** @@ -86,7 +86,7 @@ public final class CloudEventUtils { * @param mapper the mapper to use when reading the data * @return the reader implementation */ - public static CloudEvent toEvent(CloudEventReader reader, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException { + public static CloudEvent toEvent(CloudEventReader reader, CloudEventDataMapper mapper) throws CloudEventRWException { return reader.read(CloudEventBuilder::fromSpecVersion, mapper); } diff --git a/core/src/main/java/io/cloudevents/core/format/EventFormat.java b/core/src/main/java/io/cloudevents/core/format/EventFormat.java index 3445b949..f99915aa 100644 --- a/core/src/main/java/io/cloudevents/core/format/EventFormat.java +++ b/core/src/main/java/io/cloudevents/core/format/EventFormat.java @@ -18,7 +18,7 @@ package io.cloudevents.core.format; import io.cloudevents.CloudEvent; -import io.cloudevents.lang.Nullable; +import io.cloudevents.CloudEventData; import io.cloudevents.rw.CloudEventDataMapper; import javax.annotation.ParametersAreNonnullByDefault; @@ -61,7 +61,7 @@ public interface EventFormat { /** * Like {@link EventFormat#deserialize(byte[])}, but allows a mapper that maps the parsed {@link io.cloudevents.CloudEventData} to another one. */ - CloudEvent deserialize(byte[] bytes, @Nullable CloudEventDataMapper mapper) throws EventDeserializationException; + CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException; /** * @return the set of content types this event format can deserialize. These content types are used diff --git a/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java b/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java index a03db9a2..24448ceb 100644 --- a/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java +++ b/core/src/main/java/io/cloudevents/core/impl/BaseCloudEvent.java @@ -50,13 +50,13 @@ public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader, Cl return this.extensions.keySet(); } - public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { CloudEventWriter visitor = writerFactory.create(this.getSpecVersion()); this.readAttributes(visitor); this.readExtensions(visitor); if (this.data != null) { - return visitor.end(mapper != null ? mapper.map(this.data) : this.data); + return visitor.end(mapper.map(this.data)); } return visitor.end(); diff --git a/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java b/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java index 3cefd58b..9228ea6c 100644 --- a/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java +++ b/core/src/main/java/io/cloudevents/core/impl/CloudEventReaderAdapter.java @@ -18,6 +18,7 @@ package io.cloudevents.core.impl; import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.cloudevents.rw.*; public class CloudEventReaderAdapter implements CloudEventReader, CloudEventContextReader { @@ -29,13 +30,13 @@ public class CloudEventReaderAdapter implements CloudEventReader, CloudEventCont } @Override - public , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws RuntimeException { + public , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws RuntimeException { CloudEventWriter visitor = writerFactory.create(event.getSpecVersion()); this.readAttributes(visitor); this.readExtensions(visitor); if (event.getData() != null) { - return visitor.end(mapper != null ? mapper.map(event.getData()) : event.getData()); + return visitor.end(mapper.map(event.getData())); } return visitor.end(); diff --git a/core/src/main/java/io/cloudevents/core/message/MessageReader.java b/core/src/main/java/io/cloudevents/core/message/MessageReader.java index 6099abe5..85f2bc24 100644 --- a/core/src/main/java/io/cloudevents/core/message/MessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/MessageReader.java @@ -20,7 +20,6 @@ package io.cloudevents.core.message; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.core.CloudEventUtils; -import io.cloudevents.lang.Nullable; import io.cloudevents.rw.*; import javax.annotation.ParametersAreNonnullByDefault; @@ -39,13 +38,13 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader * @throws IllegalStateException if the message is not in binary encoding. */ default , R> R read(CloudEventWriterFactory writerFactory) throws CloudEventRWException, IllegalStateException { - return read(writerFactory, null); + return read(writerFactory, CloudEventDataMapper.identity()); } /** * Like {@link MessageReader#read(CloudEventWriterFactory)}, but providing a mapper for {@link io.cloudevents.CloudEventData} to be invoked when the data field is available. */ - , R> R read(CloudEventWriterFactory writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException; + , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException; /** * Visit the message as structured encoded event using the provided visitor @@ -89,7 +88,7 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader * @throws IllegalStateException if the message has an unknown encoding. */ default CloudEvent toEvent() throws CloudEventRWException, IllegalStateException { - return toEvent(null); + return toEvent(CloudEventDataMapper.identity()); } /** @@ -99,7 +98,7 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader * @throws CloudEventRWException if something went wrong during the visit. * @throws IllegalStateException if the message has an unknown encoding. */ - default CloudEvent toEvent(@Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + default CloudEvent toEvent(CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { switch (getEncoding()) { case BINARY: return CloudEventUtils.toEvent(this, mapper); diff --git a/core/src/main/java/io/cloudevents/core/message/StructuredMessageReader.java b/core/src/main/java/io/cloudevents/core/message/StructuredMessageReader.java index 327f6687..c81e71f5 100644 --- a/core/src/main/java/io/cloudevents/core/message/StructuredMessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/StructuredMessageReader.java @@ -21,7 +21,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; -import io.cloudevents.lang.Nullable; import io.cloudevents.rw.CloudEventDataMapper; import io.cloudevents.rw.CloudEventRWException; @@ -45,7 +44,7 @@ public interface StructuredMessageReader { return this.read(EventFormat::deserialize); } - default CloudEvent toEvent(@Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + default CloudEvent toEvent(CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { return this.read((format, value) -> format.deserialize(value, mapper)); } diff --git a/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java b/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java index 2d564659..9038c747 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/BaseGenericBinaryMessageReaderImpl.java @@ -48,7 +48,7 @@ public abstract class BaseGenericBinaryMessageReaderImpl extends BaseBin } @Override - public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { CloudEventWriter visitor = writerFactory.create(this.version); // Grab from headers the attributes and extensions @@ -72,7 +72,7 @@ public abstract class BaseGenericBinaryMessageReaderImpl extends BaseBin // Set the payload if (this.body != null) { - return visitor.end(mapper != null ? mapper.map(this.body) : this.body); + return visitor.end(mapper.map(this.body)); } return visitor.end(); diff --git a/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java b/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java index 4639d50e..68f99569 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/BaseStructuredMessageReader.java @@ -17,6 +17,7 @@ package io.cloudevents.core.message.impl; +import io.cloudevents.CloudEventData; import io.cloudevents.core.message.Encoding; import io.cloudevents.core.message.MessageReader; import io.cloudevents.rw.CloudEventDataMapper; @@ -31,7 +32,7 @@ public abstract class BaseStructuredMessageReader implements MessageReader { } @Override - public , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) { + public , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) { throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED); } } diff --git a/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java b/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java index 94c78d3a..1b1c41dd 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java @@ -17,6 +17,7 @@ package io.cloudevents.core.message.impl; +import io.cloudevents.CloudEventData; import io.cloudevents.core.message.Encoding; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.StructuredMessageWriter; @@ -32,7 +33,7 @@ public class UnknownEncodingMessageReader implements MessageReader { } @Override - public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { throw new IllegalStateException("Unknown encoding"); } diff --git a/core/src/test/java/io/cloudevents/core/mock/CSVFormat.java b/core/src/test/java/io/cloudevents/core/mock/CSVFormat.java index b0b656f7..3b4f330f 100644 --- a/core/src/test/java/io/cloudevents/core/mock/CSVFormat.java +++ b/core/src/test/java/io/cloudevents/core/mock/CSVFormat.java @@ -18,6 +18,7 @@ package io.cloudevents.core.mock; import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.data.BytesCloudEventData; @@ -59,7 +60,7 @@ public class CSVFormat implements EventFormat { } @Override - public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) { + public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) { String[] splitted = new String(bytes, StandardCharsets.UTF_8).split(Pattern.quote(",")); SpecVersion sv = SpecVersion.parse(splitted[0]); @@ -90,11 +91,7 @@ public class CSVFormat implements EventFormat { builder.withTime(time); } if (data != null) { - if (mapper != null) { - builder.withData(mapper.map(new BytesCloudEventData(data))); - } else { - builder.withData(data); - } + builder.withData(mapper.map(new BytesCloudEventData(data))); } return builder.build(); } diff --git a/core/src/test/java/io/cloudevents/core/mock/MockBinaryMessageWriter.java b/core/src/test/java/io/cloudevents/core/mock/MockBinaryMessageWriter.java index 8add48b9..2144e19e 100644 --- a/core/src/test/java/io/cloudevents/core/mock/MockBinaryMessageWriter.java +++ b/core/src/test/java/io/cloudevents/core/mock/MockBinaryMessageWriter.java @@ -62,7 +62,7 @@ public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements } @Override - public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { if (version == null) { throw new IllegalStateException("MockBinaryMessage is empty"); } @@ -72,7 +72,7 @@ public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements this.readExtensions(visitor); if (this.data != null) { - return visitor.end(mapper != null ? mapper.map(this.data) : this.data); + return visitor.end(mapper.map(this.data)); } return visitor.end(); diff --git a/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java b/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java index 62aa8353..4697da67 100644 --- a/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java +++ b/formats/json-jackson/src/main/java/io/cloudevents/jackson/CloudEventDeserializer.java @@ -54,7 +54,7 @@ public class CloudEventDeserializer extends StdDeserializer { } @Override - public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { + public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { try { SpecVersion specVersion = SpecVersion.parse(getStringNode(this.node, this.p, "specversion")); CloudEventWriter visitor = writerFactory.create(specVersion); @@ -144,7 +144,7 @@ public class CloudEventDeserializer extends StdDeserializer { }); if (data != null) { - return visitor.end(mapper != null ? mapper.map(data) : data); + return visitor.end(mapper.map(data)); } return visitor.end(); } catch (IOException e) { diff --git a/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java b/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java index 06d86d04..8d0017fc 100644 --- a/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java +++ b/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.format.EventDeserializationException; import io.cloudevents.core.format.EventFormat; @@ -85,15 +86,15 @@ public final class JsonFormat implements EventFormat { } @Override - public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException { + public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException { CloudEvent deserialized = this.deserialize(bytes); if (deserialized.getData() == null) { return deserialized; } try { - return CloudEventBuilder.from(deserialized).withData( - mapper != null ? mapper.map(deserialized.getData()) : deserialized.getData() - ).build(); + return CloudEventBuilder.from(deserialized) + .withData(mapper.map(deserialized.getData())) + .build(); } catch (CloudEventRWException e) { throw new EventDeserializationException(e); } diff --git a/kafka/src/test/java/io/cloudevents/kafka/CloudEventDeserializerTest.java b/kafka/src/test/java/io/cloudevents/kafka/CloudEventDeserializerTest.java index 09f1d6f4..ca92b425 100644 --- a/kafka/src/test/java/io/cloudevents/kafka/CloudEventDeserializerTest.java +++ b/kafka/src/test/java/io/cloudevents/kafka/CloudEventDeserializerTest.java @@ -42,7 +42,7 @@ public class CloudEventDeserializerTest { @Test public void deserializerWithMapper() { - CloudEventDataMapper mapper = data -> MyCloudEventData.fromStringBytes(data.toBytes()); + CloudEventDataMapper mapper = data -> MyCloudEventData.fromStringBytes(data.toBytes()); CloudEventDeserializer deserializer = new CloudEventDeserializer(); HashMap config = new HashMap<>();