From 5a926820b404270b5e014de0f1f462fefa4a1c92 Mon Sep 17 00:00:00 2001 From: Sreenath Madasu Date: Thu, 26 Nov 2020 05:23:24 -0500 Subject: [PATCH] Remove UnknownEncodingMessageReader and replace with exceptions #262 (#277) * Javadoc'ed + Cleanup of the api module (#267) * Javadoc'ed more and more the api module Cleanup the CloudEventRWException More tests on the API module Signed-off-by: Francesco Guardiani * Use parseTime Signed-off-by: Francesco Guardiani * Better docs on the Extensions Signed-off-by: Francesco Guardiani Signed-off-by: Sreenath Madasu * Remove UnknownEncodingMessageReader and replace with exceptions #262 Signed-off-by: Sreenath Madasu * Remove UnknownEncodingMessageReader and replace with exceptions -- Added Unit tests #262 Signed-off-by: Sreenath Madasu * Remove UnknownEncodingMessageReader and replace with exceptions -- Fixed compile error #262 Signed-off-by: Sreenath Madasu * Remove UnknownEncodingMessageReader and replace with exceptions -- changed exception name and details #262 Signed-off-by: Sreenath Madasu Signed-off-by: Sreenath Madasu * Fixed Signed-off-by: Francesco Guardiani Co-authored-by: Francesco Guardiani Co-authored-by: Sreenath Madasu --- .../amqp/ProtonAmqpMessageFactory.java | 29 +++++----- .../cloudevents/rw/CloudEventRWException.java | 11 ++++ .../io/cloudevents/core/message/Encoding.java | 3 +- .../core/message/impl/MessageUtils.java | 7 ++- .../impl/UnknownEncodingMessageReader.java | 44 -------------- .../core/message/impl/MessageUtilsTest.java | 57 +++++++++++++++++++ .../cloudevents/http/HttpMessageFactory.java | 4 +- .../ws/impl/RestfulWSMessageFactory.java | 4 +- .../http/vertx/VertxMessageFactory.java | 4 +- .../kafka/CloudEventSerializer.java | 3 - .../kafka/KafkaMessageFactory.java | 4 +- 11 files changed, 90 insertions(+), 80 deletions(-) delete mode 100644 core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java create mode 100644 core/src/test/java/io/cloudevents/core/message/impl/MessageUtilsTest.java diff --git a/amqp/src/main/java/io/cloudevents/amqp/ProtonAmqpMessageFactory.java b/amqp/src/main/java/io/cloudevents/amqp/ProtonAmqpMessageFactory.java index cdd1ad0d..3ca2577a 100644 --- a/amqp/src/main/java/io/cloudevents/amqp/ProtonAmqpMessageFactory.java +++ b/amqp/src/main/java/io/cloudevents/amqp/ProtonAmqpMessageFactory.java @@ -18,20 +18,18 @@ package io.cloudevents.amqp; -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.message.Message; - +import io.cloudevents.amqp.impl.AmqpConstants; import io.cloudevents.amqp.impl.ProtonAmqpBinaryMessageReader; import io.cloudevents.amqp.impl.ProtonAmqpMessageWriter; -import io.cloudevents.amqp.impl.AmqpConstants; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.message.impl.UnknownEncodingMessageReader; import io.cloudevents.rw.CloudEventWriter; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.message.Message; + +import javax.annotation.ParametersAreNonnullByDefault; /** * A factory class providing convenience methods for creating MessageReader and MessageWriter instances based on Qpid Proton. */ @@ -46,9 +44,9 @@ public final class ProtonAmqpMessageFactory { * Creates a MessageReader to read a proton-based {@link Message}. *

* This implementation simply calls {@link #createReader(String, ApplicationProperties, byte[])}. - * + * * @param message The proton message to read from. - * + * * @return A message reader that can read the given proton message to a cloud event representation. */ public static MessageReader createReader(final Message message) { @@ -61,7 +59,7 @@ public final class ProtonAmqpMessageFactory { * Creates a MessageReader using the content-type property and payload of a proton-based message. *

* This method simply calls {@link #createReader(String, ApplicationProperties, byte[])}. - * + * * @param contentType The content-type of the message payload. * @param payload The message payload in bytes. * @return A message reader capable of representing a CloudEvent from @@ -74,7 +72,7 @@ public final class ProtonAmqpMessageFactory { /** * Creates a MessageWriter capable of translating both a structured and binary CloudEvent * to a proton-based AMQP 1.0 representation. - * + * * @return A message writer to read structured and binary cloud event from a proton-based message. */ public static MessageWriter, Message> createWriter() { @@ -84,7 +82,7 @@ public final class ProtonAmqpMessageFactory { /** * Creates a MessageReader to read using the content-type property, application-propeties and data payload * of a proton-based message. - * + * * @param contentType The content-type of the message payload. * @param props The application-properties section of the proton-message containing cloud event metadata (attributes and/or extensions). * @param payload The message payload in bytes or {@code null} if the message does not contain any payload. @@ -94,12 +92,11 @@ public final class ProtonAmqpMessageFactory { public static MessageReader createReader(final String contentType, final ApplicationProperties props, final byte[] payload) { return MessageUtils.parseStructuredOrBinaryMessage( - () -> contentType, + () -> contentType, format -> new GenericStructuredMessageReader(format, payload), () -> AmqpConstants.getApplicationProperty(props, AmqpConstants.APP_PROPERTY_SPEC_VERSION, String.class), - sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload), - UnknownEncodingMessageReader::new); + sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload)); } - + } diff --git a/api/src/main/java/io/cloudevents/rw/CloudEventRWException.java b/api/src/main/java/io/cloudevents/rw/CloudEventRWException.java index 4d44d284..0a88a396 100644 --- a/api/src/main/java/io/cloudevents/rw/CloudEventRWException.java +++ b/api/src/main/java/io/cloudevents/rw/CloudEventRWException.java @@ -56,6 +56,10 @@ public class CloudEventRWException extends RuntimeException { * Error while converting CloudEventData. */ DATA_CONVERSION, + /** + * Invalid content type or spec version + */ + UNKNOWN_ENCODING, /** * Other error. */ @@ -146,4 +150,11 @@ public class CloudEventRWException extends RuntimeException { cause ); } + + public static CloudEventRWException newUnknownEncodingException() { + return new CloudEventRWException( + CloudEventRWExceptionKind.UNKNOWN_ENCODING, + "Could not parse. Unknown encoding. Invalid content type or spec version" + ); + } } diff --git a/core/src/main/java/io/cloudevents/core/message/Encoding.java b/core/src/main/java/io/cloudevents/core/message/Encoding.java index 6ec7c53a..456e3abe 100644 --- a/core/src/main/java/io/cloudevents/core/message/Encoding.java +++ b/core/src/main/java/io/cloudevents/core/message/Encoding.java @@ -22,6 +22,5 @@ package io.cloudevents.core.message; */ public enum Encoding { STRUCTURED, - BINARY, - UNKNOWN + BINARY } diff --git a/core/src/main/java/io/cloudevents/core/message/impl/MessageUtils.java b/core/src/main/java/io/cloudevents/core/message/impl/MessageUtils.java index 346b7977..cdc93836 100644 --- a/core/src/main/java/io/cloudevents/core/message/impl/MessageUtils.java +++ b/core/src/main/java/io/cloudevents/core/message/impl/MessageUtils.java @@ -29,6 +29,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException; + public class MessageUtils { /** @@ -38,8 +40,7 @@ public class MessageUtils { Supplier contentTypeHeaderReader, Function structuredMessageFactory, Supplier specVersionHeaderReader, - Function binaryMessageFactory, - Supplier unknownMessageFactory + Function binaryMessageFactory ) { // Let's try structured mode String ct = contentTypeHeaderReader.get(); @@ -57,7 +58,7 @@ public class MessageUtils { return binaryMessageFactory.apply(SpecVersion.parse(specVersionUnparsed)); } - return unknownMessageFactory.get(); + throw newUnknownEncodingException(); } /** diff --git a/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java b/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java deleted file mode 100644 index 1b1c41dd..00000000 --- a/core/src/main/java/io/cloudevents/core/message/impl/UnknownEncodingMessageReader.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2018-Present The CloudEvents Authors - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.cloudevents.core.message.impl; - -import io.cloudevents.CloudEventData; -import io.cloudevents.core.message.Encoding; -import io.cloudevents.core.message.MessageReader; -import io.cloudevents.core.message.StructuredMessageWriter; -import io.cloudevents.rw.CloudEventDataMapper; -import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.rw.CloudEventWriter; -import io.cloudevents.rw.CloudEventWriterFactory; - -public class UnknownEncodingMessageReader implements MessageReader { - @Override - public Encoding getEncoding() { - return Encoding.UNKNOWN; - } - - @Override - public , V> V read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException { - throw new IllegalStateException("Unknown encoding"); - } - - @Override - public T read(StructuredMessageWriter visitor) throws CloudEventRWException, IllegalStateException { - throw new IllegalStateException("Unknown encoding"); - } -} diff --git a/core/src/test/java/io/cloudevents/core/message/impl/MessageUtilsTest.java b/core/src/test/java/io/cloudevents/core/message/impl/MessageUtilsTest.java new file mode 100644 index 00000000..458ee5f7 --- /dev/null +++ b/core/src/test/java/io/cloudevents/core/message/impl/MessageUtilsTest.java @@ -0,0 +1,57 @@ +package io.cloudevents.core.message.impl; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.rw.CloudEventRWException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static io.cloudevents.SpecVersion.V03; +import static io.cloudevents.SpecVersion.V1; +import static io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +class MessageUtilsTest { + + @Test + void parseStructuredOrBinaryMessage_Exception() { + final CloudEventRWException cloudEventRWException = assertThrows(CloudEventRWException.class, () -> { + parseStructuredOrBinaryMessage(() -> null, eventFormat -> null, () -> null, specVersion -> null); + }); + assertThat(cloudEventRWException.getKind()) + .isEqualTo(CloudEventRWException.CloudEventRWExceptionKind.UNKNOWN_ENCODING); + } + + @Test + void testParseStructuredOrBinaryMessage_StructuredMode() { + MessageUtils.parseStructuredOrBinaryMessage(() -> "application/cloudevents+csv;", + eventFormat -> { + assertTrue(eventFormat instanceof CSVFormat); + return null; + }, + () -> null, specVersion -> null); + } + + @ParameterizedTest + @MethodSource + void testParseStructuredOrBinaryMessage_BinaryMode(String specVersionHeader, SpecVersion expectedSpecVersion) { + MessageUtils.parseStructuredOrBinaryMessage(() -> null, eventFormat -> null, + () -> specVersionHeader, specVersion -> { + assertEquals(expectedSpecVersion, specVersion); + return null; + }); + } + + private static Stream testParseStructuredOrBinaryMessage_BinaryMode() { + return Stream.of( + Arguments.of("0.3", V03), + Arguments.of("1.0", V1) + ); + } + +} diff --git a/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java b/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java index 1ffe381a..e363385a 100644 --- a/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java +++ b/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java @@ -20,7 +20,6 @@ import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.message.impl.UnknownEncodingMessageReader; import io.cloudevents.http.impl.CloudEventsHeaders; import io.cloudevents.http.impl.HttpMessageReader; import io.cloudevents.http.impl.HttpMessageWriter; @@ -80,8 +79,7 @@ public final class HttpMessageFactory { contentType::get, format -> new GenericStructuredMessageReader(format, body), specVersion::get, - sv -> new HttpMessageReader(sv, forEachHeader, body), - UnknownEncodingMessageReader::new + sv -> new HttpMessageReader(sv, forEachHeader, body) ); } diff --git a/http/restful-ws/src/main/java/io/cloudevents/http/restful/ws/impl/RestfulWSMessageFactory.java b/http/restful-ws/src/main/java/io/cloudevents/http/restful/ws/impl/RestfulWSMessageFactory.java index 7ada61bb..73a95904 100644 --- a/http/restful-ws/src/main/java/io/cloudevents/http/restful/ws/impl/RestfulWSMessageFactory.java +++ b/http/restful-ws/src/main/java/io/cloudevents/http/restful/ws/impl/RestfulWSMessageFactory.java @@ -20,7 +20,6 @@ package io.cloudevents.http.restful.ws.impl; import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.message.impl.UnknownEncodingMessageReader; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; @@ -36,8 +35,7 @@ public final class RestfulWSMessageFactory { () -> headers.getFirst(HttpHeaders.CONTENT_TYPE), format -> new GenericStructuredMessageReader(format, payload), () -> headers.getFirst(CloudEventsHeaders.SPEC_VERSION), - sv -> new BinaryRestfulWSMessageReaderImpl(sv, headers, payload), - UnknownEncodingMessageReader::new + sv -> new BinaryRestfulWSMessageReaderImpl(sv, headers, payload) ); } diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessageFactory.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessageFactory.java index 8d0bb05f..37221283 100644 --- a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessageFactory.java +++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessageFactory.java @@ -4,7 +4,6 @@ import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.message.impl.UnknownEncodingMessageReader; import io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl; import io.cloudevents.http.vertx.impl.CloudEventsHeaders; import io.cloudevents.http.vertx.impl.VertxWebClientRequestMessageWriterImpl; @@ -43,8 +42,7 @@ public final class VertxMessageFactory { () -> headers.get(HttpHeaders.CONTENT_TYPE), format -> new GenericStructuredMessageReader(format, body.getBytes()), () -> headers.get(CloudEventsHeaders.SPEC_VERSION), - sv -> new BinaryVertxMessageReaderImpl(sv, headers, body), - UnknownEncodingMessageReader::new + sv -> new BinaryVertxMessageReaderImpl(sv, headers, body) ); } diff --git a/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java b/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java index ca499dc4..b4548de5 100644 --- a/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java +++ b/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java @@ -56,9 +56,6 @@ public class CloudEventSerializer implements Serializer { } else if (encodingConfig != null) { throw new IllegalArgumentException(ENCODING_CONFIG + " can be of type String or " + Encoding.class.getCanonicalName()); } - if (this.encoding == Encoding.UNKNOWN) { - throw new IllegalArgumentException(ENCODING_CONFIG + " cannot be " + Encoding.UNKNOWN); - } if (this.encoding == Encoding.STRUCTURED) { Object eventFormatConfig = configs.get(EVENT_FORMAT_CONFIG); diff --git a/kafka/src/main/java/io/cloudevents/kafka/KafkaMessageFactory.java b/kafka/src/main/java/io/cloudevents/kafka/KafkaMessageFactory.java index 14045531..a82fb4f4 100644 --- a/kafka/src/main/java/io/cloudevents/kafka/KafkaMessageFactory.java +++ b/kafka/src/main/java/io/cloudevents/kafka/KafkaMessageFactory.java @@ -21,7 +21,6 @@ import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.impl.GenericStructuredMessageReader; import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.message.impl.UnknownEncodingMessageReader; import io.cloudevents.kafka.impl.KafkaBinaryMessageReaderImpl; import io.cloudevents.kafka.impl.KafkaHeaders; import io.cloudevents.kafka.impl.KafkaProducerMessageWriterImpl; @@ -61,8 +60,7 @@ public final class KafkaMessageFactory { () -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.CONTENT_TYPE), format -> new GenericStructuredMessageReader(format, payload), () -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.SPEC_VERSION), - sv -> new KafkaBinaryMessageReaderImpl(sv, headers, payload), - UnknownEncodingMessageReader::new + sv -> new KafkaBinaryMessageReaderImpl(sv, headers, payload) ); }