* Javadoc'ed + Cleanup of the api module (#267) * Javadoc'ed more and more the api module Cleanup the CloudEventRWException More tests on the API module Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Use parseTime Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Better docs on the Extensions Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> Signed-off-by: Sreenath Madasu <MADASUSX@legal.regn.net> * Remove UnknownEncodingMessageReader and replace with exceptions #262 Signed-off-by: Sreenath Madasu <Sreenath.Madasu@gmail.com> * Remove UnknownEncodingMessageReader and replace with exceptions -- Added Unit tests #262 Signed-off-by: Sreenath Madasu <Sreenath.Madasu@gmail.com> * Remove UnknownEncodingMessageReader and replace with exceptions -- Fixed compile error #262 Signed-off-by: Sreenath Madasu <Sreenath.Madasu@gmail.com> * Remove UnknownEncodingMessageReader and replace with exceptions -- changed exception name and details #262 Signed-off-by: Sreenath Madasu <Sreenath.Madasu@gmail.com> Signed-off-by: Sreenath Madasu <MADASUSX@legal.regn.net> * Fixed Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> Co-authored-by: Francesco Guardiani <francescoguard@gmail.com> Co-authored-by: Sreenath Madasu <MADASUSX@legal.regn.net>
This commit is contained in:
parent
7c0b1e3c49
commit
5a926820b4
|
@ -18,20 +18,18 @@
|
|||
package io.cloudevents.amqp;
|
||||
|
||||
|
||||
import javax.annotation.ParametersAreNonnullByDefault;
|
||||
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
|
||||
import io.cloudevents.amqp.impl.AmqpConstants;
|
||||
import io.cloudevents.amqp.impl.ProtonAmqpBinaryMessageReader;
|
||||
import io.cloudevents.amqp.impl.ProtonAmqpMessageWriter;
|
||||
import io.cloudevents.amqp.impl.AmqpConstants;
|
||||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.core.message.MessageWriter;
|
||||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
|
||||
import io.cloudevents.rw.CloudEventWriter;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
|
||||
import javax.annotation.ParametersAreNonnullByDefault;
|
||||
/**
|
||||
* A factory class providing convenience methods for creating MessageReader and MessageWriter instances based on Qpid Proton.
|
||||
*/
|
||||
|
@ -46,9 +44,9 @@ public final class ProtonAmqpMessageFactory {
|
|||
* Creates a MessageReader to read a proton-based {@link Message}.
|
||||
* <p>
|
||||
* This implementation simply calls {@link #createReader(String, ApplicationProperties, byte[])}.
|
||||
*
|
||||
*
|
||||
* @param message The proton message to read from.
|
||||
*
|
||||
*
|
||||
* @return A message reader that can read the given proton message to a cloud event representation.
|
||||
*/
|
||||
public static MessageReader createReader(final Message message) {
|
||||
|
@ -61,7 +59,7 @@ public final class ProtonAmqpMessageFactory {
|
|||
* Creates a MessageReader using the content-type property and payload of a proton-based message.
|
||||
* <p>
|
||||
* This method simply calls {@link #createReader(String, ApplicationProperties, byte[])}.
|
||||
*
|
||||
*
|
||||
* @param contentType The content-type of the message payload.
|
||||
* @param payload The message payload in bytes.
|
||||
* @return A message reader capable of representing a CloudEvent from
|
||||
|
@ -74,7 +72,7 @@ public final class ProtonAmqpMessageFactory {
|
|||
/**
|
||||
* Creates a MessageWriter capable of translating both a structured and binary CloudEvent
|
||||
* to a proton-based AMQP 1.0 representation.
|
||||
*
|
||||
*
|
||||
* @return A message writer to read structured and binary cloud event from a proton-based message.
|
||||
*/
|
||||
public static MessageWriter<CloudEventWriter<Message>, Message> createWriter() {
|
||||
|
@ -84,7 +82,7 @@ public final class ProtonAmqpMessageFactory {
|
|||
/**
|
||||
* Creates a MessageReader to read using the content-type property, application-propeties and data payload
|
||||
* of a proton-based message.
|
||||
*
|
||||
*
|
||||
* @param contentType The content-type of the message payload.
|
||||
* @param props The application-properties section of the proton-message containing cloud event metadata (attributes and/or extensions).
|
||||
* @param payload The message payload in bytes or {@code null} if the message does not contain any payload.
|
||||
|
@ -94,12 +92,11 @@ public final class ProtonAmqpMessageFactory {
|
|||
public static MessageReader createReader(final String contentType, final ApplicationProperties props, final byte[] payload) {
|
||||
|
||||
return MessageUtils.parseStructuredOrBinaryMessage(
|
||||
() -> contentType,
|
||||
() -> contentType,
|
||||
format -> new GenericStructuredMessageReader(format, payload),
|
||||
() -> AmqpConstants.getApplicationProperty(props, AmqpConstants.APP_PROPERTY_SPEC_VERSION, String.class),
|
||||
sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload),
|
||||
UnknownEncodingMessageReader::new);
|
||||
sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload));
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -56,6 +56,10 @@ public class CloudEventRWException extends RuntimeException {
|
|||
* Error while converting CloudEventData.
|
||||
*/
|
||||
DATA_CONVERSION,
|
||||
/**
|
||||
* Invalid content type or spec version
|
||||
*/
|
||||
UNKNOWN_ENCODING,
|
||||
/**
|
||||
* Other error.
|
||||
*/
|
||||
|
@ -146,4 +150,11 @@ public class CloudEventRWException extends RuntimeException {
|
|||
cause
|
||||
);
|
||||
}
|
||||
|
||||
public static CloudEventRWException newUnknownEncodingException() {
|
||||
return new CloudEventRWException(
|
||||
CloudEventRWExceptionKind.UNKNOWN_ENCODING,
|
||||
"Could not parse. Unknown encoding. Invalid content type or spec version"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,5 @@ package io.cloudevents.core.message;
|
|||
*/
|
||||
public enum Encoding {
|
||||
STRUCTURED,
|
||||
BINARY,
|
||||
UNKNOWN
|
||||
BINARY
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException;
|
||||
|
||||
public class MessageUtils {
|
||||
|
||||
/**
|
||||
|
@ -38,8 +40,7 @@ public class MessageUtils {
|
|||
Supplier<String> contentTypeHeaderReader,
|
||||
Function<EventFormat, MessageReader> structuredMessageFactory,
|
||||
Supplier<String> specVersionHeaderReader,
|
||||
Function<SpecVersion, MessageReader> binaryMessageFactory,
|
||||
Supplier<MessageReader> unknownMessageFactory
|
||||
Function<SpecVersion, MessageReader> binaryMessageFactory
|
||||
) {
|
||||
// Let's try structured mode
|
||||
String ct = contentTypeHeaderReader.get();
|
||||
|
@ -57,7 +58,7 @@ public class MessageUtils {
|
|||
return binaryMessageFactory.apply(SpecVersion.parse(specVersionUnparsed));
|
||||
}
|
||||
|
||||
return unknownMessageFactory.get();
|
||||
throw newUnknownEncodingException();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018-Present The CloudEvents Authors
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package io.cloudevents.core.message.impl;
|
||||
|
||||
import io.cloudevents.CloudEventData;
|
||||
import io.cloudevents.core.message.Encoding;
|
||||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.core.message.StructuredMessageWriter;
|
||||
import io.cloudevents.rw.CloudEventDataMapper;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
import io.cloudevents.rw.CloudEventWriter;
|
||||
import io.cloudevents.rw.CloudEventWriterFactory;
|
||||
|
||||
public class UnknownEncodingMessageReader implements MessageReader {
|
||||
@Override
|
||||
public Encoding getEncoding() {
|
||||
return Encoding.UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
|
||||
throw new IllegalStateException("Unknown encoding");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T read(StructuredMessageWriter<T> visitor) throws CloudEventRWException, IllegalStateException {
|
||||
throw new IllegalStateException("Unknown encoding");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package io.cloudevents.core.message.impl;
|
||||
|
||||
import io.cloudevents.SpecVersion;
|
||||
import io.cloudevents.core.mock.CSVFormat;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.cloudevents.SpecVersion.V03;
|
||||
import static io.cloudevents.SpecVersion.V1;
|
||||
import static io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class MessageUtilsTest {
|
||||
|
||||
@Test
|
||||
void parseStructuredOrBinaryMessage_Exception() {
|
||||
final CloudEventRWException cloudEventRWException = assertThrows(CloudEventRWException.class, () -> {
|
||||
parseStructuredOrBinaryMessage(() -> null, eventFormat -> null, () -> null, specVersion -> null);
|
||||
});
|
||||
assertThat(cloudEventRWException.getKind())
|
||||
.isEqualTo(CloudEventRWException.CloudEventRWExceptionKind.UNKNOWN_ENCODING);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testParseStructuredOrBinaryMessage_StructuredMode() {
|
||||
MessageUtils.parseStructuredOrBinaryMessage(() -> "application/cloudevents+csv;",
|
||||
eventFormat -> {
|
||||
assertTrue(eventFormat instanceof CSVFormat);
|
||||
return null;
|
||||
},
|
||||
() -> null, specVersion -> null);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
void testParseStructuredOrBinaryMessage_BinaryMode(String specVersionHeader, SpecVersion expectedSpecVersion) {
|
||||
MessageUtils.parseStructuredOrBinaryMessage(() -> null, eventFormat -> null,
|
||||
() -> specVersionHeader, specVersion -> {
|
||||
assertEquals(expectedSpecVersion, specVersion);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private static Stream<Arguments> testParseStructuredOrBinaryMessage_BinaryMode() {
|
||||
return Stream.of(
|
||||
Arguments.of("0.3", V03),
|
||||
Arguments.of("1.0", V1)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -20,7 +20,6 @@ import io.cloudevents.core.message.MessageReader;
|
|||
import io.cloudevents.core.message.MessageWriter;
|
||||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
|
||||
import io.cloudevents.http.impl.CloudEventsHeaders;
|
||||
import io.cloudevents.http.impl.HttpMessageReader;
|
||||
import io.cloudevents.http.impl.HttpMessageWriter;
|
||||
|
@ -80,8 +79,7 @@ public final class HttpMessageFactory {
|
|||
contentType::get,
|
||||
format -> new GenericStructuredMessageReader(format, body),
|
||||
specVersion::get,
|
||||
sv -> new HttpMessageReader(sv, forEachHeader, body),
|
||||
UnknownEncodingMessageReader::new
|
||||
sv -> new HttpMessageReader(sv, forEachHeader, body)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package io.cloudevents.http.restful.ws.impl;
|
|||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
|
||||
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
@ -36,8 +35,7 @@ public final class RestfulWSMessageFactory {
|
|||
() -> headers.getFirst(HttpHeaders.CONTENT_TYPE),
|
||||
format -> new GenericStructuredMessageReader(format, payload),
|
||||
() -> headers.getFirst(CloudEventsHeaders.SPEC_VERSION),
|
||||
sv -> new BinaryRestfulWSMessageReaderImpl(sv, headers, payload),
|
||||
UnknownEncodingMessageReader::new
|
||||
sv -> new BinaryRestfulWSMessageReaderImpl(sv, headers, payload)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ import io.cloudevents.core.message.MessageReader;
|
|||
import io.cloudevents.core.message.MessageWriter;
|
||||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
|
||||
import io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl;
|
||||
import io.cloudevents.http.vertx.impl.CloudEventsHeaders;
|
||||
import io.cloudevents.http.vertx.impl.VertxWebClientRequestMessageWriterImpl;
|
||||
|
@ -43,8 +42,7 @@ public final class VertxMessageFactory {
|
|||
() -> headers.get(HttpHeaders.CONTENT_TYPE),
|
||||
format -> new GenericStructuredMessageReader(format, body.getBytes()),
|
||||
() -> headers.get(CloudEventsHeaders.SPEC_VERSION),
|
||||
sv -> new BinaryVertxMessageReaderImpl(sv, headers, body),
|
||||
UnknownEncodingMessageReader::new
|
||||
sv -> new BinaryVertxMessageReaderImpl(sv, headers, body)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -56,9 +56,6 @@ public class CloudEventSerializer implements Serializer<CloudEvent> {
|
|||
} else if (encodingConfig != null) {
|
||||
throw new IllegalArgumentException(ENCODING_CONFIG + " can be of type String or " + Encoding.class.getCanonicalName());
|
||||
}
|
||||
if (this.encoding == Encoding.UNKNOWN) {
|
||||
throw new IllegalArgumentException(ENCODING_CONFIG + " cannot be " + Encoding.UNKNOWN);
|
||||
}
|
||||
|
||||
if (this.encoding == Encoding.STRUCTURED) {
|
||||
Object eventFormatConfig = configs.get(EVENT_FORMAT_CONFIG);
|
||||
|
|
|
@ -21,7 +21,6 @@ import io.cloudevents.core.message.MessageReader;
|
|||
import io.cloudevents.core.message.MessageWriter;
|
||||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
|
||||
import io.cloudevents.kafka.impl.KafkaBinaryMessageReaderImpl;
|
||||
import io.cloudevents.kafka.impl.KafkaHeaders;
|
||||
import io.cloudevents.kafka.impl.KafkaProducerMessageWriterImpl;
|
||||
|
@ -61,8 +60,7 @@ public final class KafkaMessageFactory {
|
|||
() -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.CONTENT_TYPE),
|
||||
format -> new GenericStructuredMessageReader(format, payload),
|
||||
() -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.SPEC_VERSION),
|
||||
sv -> new KafkaBinaryMessageReaderImpl(sv, headers, payload),
|
||||
UnknownEncodingMessageReader::new
|
||||
sv -> new KafkaBinaryMessageReaderImpl(sv, headers, payload)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue