Improvements to CloudEventReader (#263)
* Extracted readAttributes and readExtensions from CloudEventReader Added CloudEventUtils#toContextReader to create a context reader starting from a CloudEvent Improved documentation of *Reader interfaces Renamed MessageReader#visit to the proper name MessageReader#read Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Typo Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
parent
db745fd309
commit
42a732623b
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright 2018-Present The CloudEvents Authors
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package io.cloudevents.rw;
|
||||
|
||||
import javax.annotation.ParametersAreNonnullByDefault;
|
||||
|
||||
/**
|
||||
* Represents an object that can be read as CloudEvent context attributes and extensions.
|
||||
* <p>
|
||||
* An object (in particular, buffered objects) can implement both this interface and {@link CloudEventReader}.
|
||||
*/
|
||||
@ParametersAreNonnullByDefault
|
||||
public interface CloudEventContextReader {
|
||||
|
||||
/**
|
||||
* Visit self attributes using the provided writer
|
||||
*
|
||||
* @param writer Attributes writer
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
*/
|
||||
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException;
|
||||
|
||||
/**
|
||||
* Visit self extensions using the provided writer
|
||||
*
|
||||
* @param visitor Extensions writer
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
*/
|
||||
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException;
|
||||
|
||||
}
|
|
@ -22,7 +22,9 @@ import io.cloudevents.lang.Nullable;
|
|||
import javax.annotation.ParametersAreNonnullByDefault;
|
||||
|
||||
/**
|
||||
* Represents an object that can be read as CloudEvent
|
||||
* Represents an object that can be read as CloudEvent.
|
||||
* <p>
|
||||
* The read may consume this object, hence it's not safe to invoke it multiple times, unless it's explicitly allowed by the implementer.
|
||||
*/
|
||||
@ParametersAreNonnullByDefault
|
||||
public interface CloudEventReader {
|
||||
|
@ -42,20 +44,4 @@ public interface CloudEventReader {
|
|||
*/
|
||||
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException;
|
||||
|
||||
/**
|
||||
* Visit self attributes using the provided writer
|
||||
*
|
||||
* @param writer Attributes writer
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
*/
|
||||
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException;
|
||||
|
||||
/**
|
||||
* Visit self extensions using the provided writer
|
||||
*
|
||||
* @param visitor Extensions writer
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
*/
|
||||
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException;
|
||||
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader {
|
||||
public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader, CloudEventContextReader {
|
||||
|
||||
private final CloudEventData data;
|
||||
protected final Map<String, Object> extensions;
|
||||
|
|
|
@ -20,9 +20,9 @@ package io.cloudevents.core.impl;
|
|||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.rw.*;
|
||||
|
||||
public class CloudEventReaderAdapter implements CloudEventReader {
|
||||
public class CloudEventReaderAdapter implements CloudEventReader, CloudEventContextReader {
|
||||
|
||||
private CloudEvent event;
|
||||
private final CloudEvent event;
|
||||
|
||||
CloudEventReaderAdapter(CloudEvent event) {
|
||||
this.event = event;
|
||||
|
|
|
@ -20,6 +20,7 @@ package io.cloudevents.core.impl;
|
|||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.CloudEventData;
|
||||
import io.cloudevents.lang.Nullable;
|
||||
import io.cloudevents.rw.CloudEventContextReader;
|
||||
import io.cloudevents.rw.CloudEventDataMapper;
|
||||
import io.cloudevents.rw.CloudEventReader;
|
||||
|
||||
|
@ -29,7 +30,9 @@ public final class CloudEventUtils {
|
|||
|
||||
/**
|
||||
* Convert a {@link CloudEvent} to a {@link CloudEventReader}. This method provides a default implementation
|
||||
* for CloudEvent that doesn't implement CloudEventVisitable
|
||||
* for CloudEvent that doesn't implement CloudEventVisitable.
|
||||
* <p>
|
||||
* It's safe to use the returned {@link CloudEventReader} multiple times.
|
||||
*
|
||||
* @param event the event to convert
|
||||
* @return the visitable implementation
|
||||
|
@ -42,6 +45,23 @@ public final class CloudEventUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a {@link CloudEvent} to a {@link CloudEventContextReader}. This method provides a default implementation
|
||||
* for {@link CloudEvent} that doesn't implement {@link CloudEventContextReader}.
|
||||
* <p>
|
||||
* It's safe to use the returned {@link CloudEventReader} multiple times.
|
||||
*
|
||||
* @param event the event to convert
|
||||
* @return the context reader implementation
|
||||
*/
|
||||
public static CloudEventContextReader toContextReader(CloudEvent event) {
|
||||
if (event instanceof CloudEventContextReader) {
|
||||
return (CloudEventContextReader) event;
|
||||
} else {
|
||||
return new CloudEventReaderAdapter(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the data contained in {@code event} and map it using the provided mapper.
|
||||
*/
|
||||
|
|
|
@ -47,24 +47,6 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader
|
|||
*/
|
||||
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException;
|
||||
|
||||
/**
|
||||
* Visit the message attributes as binary encoded event using the provided visitor.
|
||||
*
|
||||
* @param writer Attributes visitor
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
* @throws IllegalStateException if the message is not in binary encoding.
|
||||
*/
|
||||
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException, IllegalStateException;
|
||||
|
||||
/**
|
||||
* Visit the message extensions as binary encoded event using the provided visitor.
|
||||
*
|
||||
* @param visitor Extensions visitor
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
* @throws IllegalStateException if the message is not in binary encoding.
|
||||
*/
|
||||
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException, IllegalStateException;
|
||||
|
||||
/**
|
||||
* Visit the message as structured encoded event using the provided visitor
|
||||
*
|
||||
|
@ -80,15 +62,15 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader
|
|||
Encoding getEncoding();
|
||||
|
||||
/**
|
||||
* Visit the event using a {@link MessageWriter}. This method allows to transcode an event from one transport to another without
|
||||
* Read the content of this object using a {@link MessageWriter}. This method allows to transcode an event from one transport to another without
|
||||
* converting it to {@link CloudEvent}. The resulting encoding will be the same as the original encoding.
|
||||
*
|
||||
* @param visitor the MessageVisitor accepting this Message
|
||||
* @return The return value of the MessageVisitor
|
||||
* @throws CloudEventRWException if something went wrong during the visit.
|
||||
* @throws IllegalStateException if the message has an unknown encoding.
|
||||
* @throws IllegalStateException if the message has an unknown encoding.
|
||||
*/
|
||||
default <BV extends CloudEventWriter<R>, R> R visit(MessageWriter<BV, R> visitor) throws CloudEventRWException, IllegalStateException {
|
||||
default <BV extends CloudEventWriter<R>, R> R read(MessageWriter<BV, R> visitor) throws CloudEventRWException, IllegalStateException {
|
||||
switch (getEncoding()) {
|
||||
case BINARY:
|
||||
return this.read((CloudEventWriterFactory<BV, R>) visitor);
|
||||
|
|
|
@ -19,14 +19,18 @@ package io.cloudevents.core.message.impl;
|
|||
|
||||
import io.cloudevents.CloudEventData;
|
||||
import io.cloudevents.SpecVersion;
|
||||
import io.cloudevents.rw.*;
|
||||
import io.cloudevents.rw.CloudEventDataMapper;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
import io.cloudevents.rw.CloudEventWriter;
|
||||
import io.cloudevents.rw.CloudEventWriterFactory;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* This class implements a Binary {@link io.cloudevents.core.message.MessageReader}, providing common logic to most protocol bindings
|
||||
* which supports both Binary and Structured mode.
|
||||
* This class implements a Binary {@link io.cloudevents.core.message.MessageReader},
|
||||
* providing common logic to most protocol bindings which supports both Binary and Structured mode.
|
||||
* <p>
|
||||
* Content-type is handled separately using a key not prefixed with CloudEvents header prefix.
|
||||
*
|
||||
* @param <HK> Header key type
|
||||
|
@ -74,36 +78,6 @@ public abstract class BaseGenericBinaryMessageReaderImpl<HK, HV> extends BaseBin
|
|||
return visitor.end();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException {
|
||||
this.forEachHeader((key, value) -> {
|
||||
if (isContentTypeHeader(key)) {
|
||||
writer.withAttribute("datacontenttype", toCloudEventsValue(value));
|
||||
} else if (isCloudEventsHeader(key)) {
|
||||
String name = toCloudEventsKey(key);
|
||||
if (name.equals("specversion")) {
|
||||
return;
|
||||
}
|
||||
if (this.version.getAllAttributes().contains(name)) {
|
||||
writer.withAttribute(name, toCloudEventsValue(value));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readExtensions(CloudEventExtensionsWriter visitor) throws RuntimeException {
|
||||
// Grab from headers the attributes and extensions
|
||||
this.forEachHeader((key, value) -> {
|
||||
if (isCloudEventsHeader(key)) {
|
||||
String name = toCloudEventsKey(key);
|
||||
if (!this.version.getAllAttributes().contains(name)) {
|
||||
visitor.withExtension(name, toCloudEventsValue(value));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract boolean isContentTypeHeader(HK key);
|
||||
|
||||
protected abstract boolean isCloudEventsHeader(HK key);
|
||||
|
|
|
@ -19,7 +19,9 @@ package io.cloudevents.core.message.impl;
|
|||
|
||||
import io.cloudevents.core.message.Encoding;
|
||||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.rw.*;
|
||||
import io.cloudevents.rw.CloudEventDataMapper;
|
||||
import io.cloudevents.rw.CloudEventWriter;
|
||||
import io.cloudevents.rw.CloudEventWriterFactory;
|
||||
|
||||
public abstract class BaseStructuredMessageReader implements MessageReader {
|
||||
|
||||
|
@ -32,14 +34,4 @@ public abstract class BaseStructuredMessageReader implements MessageReader {
|
|||
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper mapper) {
|
||||
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException {
|
||||
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readExtensions(CloudEventExtensionsWriter visitor) throws RuntimeException {
|
||||
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,10 @@ package io.cloudevents.core.message.impl;
|
|||
import io.cloudevents.core.message.Encoding;
|
||||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.core.message.StructuredMessageWriter;
|
||||
import io.cloudevents.rw.*;
|
||||
import io.cloudevents.rw.CloudEventDataMapper;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
import io.cloudevents.rw.CloudEventWriter;
|
||||
import io.cloudevents.rw.CloudEventWriterFactory;
|
||||
|
||||
public class UnknownEncodingMessageReader implements MessageReader {
|
||||
@Override
|
||||
|
@ -33,16 +36,6 @@ public class UnknownEncodingMessageReader implements MessageReader {
|
|||
throw new IllegalStateException("Unknown encoding");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException {
|
||||
throw new IllegalStateException("Unknown encoding");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException {
|
||||
throw new IllegalStateException("Unknown encoding");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T read(StructuredMessageWriter<T> visitor) throws CloudEventRWException, IllegalStateException {
|
||||
throw new IllegalStateException("Unknown encoding");
|
||||
|
|
|
@ -54,9 +54,9 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
|
|||
@Override
|
||||
protected void setAttributes(io.cloudevents.CloudEvent event) {
|
||||
if (event.getSpecVersion() == SpecVersion.V03) {
|
||||
CloudEventUtils.toVisitable(event).readAttributes(this);
|
||||
CloudEventUtils.toContextReader(event).readAttributes(this);
|
||||
} else {
|
||||
CloudEventUtils.toVisitable(event).readAttributes(new V1ToV03AttributesConverter(this));
|
||||
CloudEventUtils.toContextReader(event).readAttributes(new V1ToV03AttributesConverter(this));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,9 +56,9 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
|
|||
@Override
|
||||
protected void setAttributes(io.cloudevents.CloudEvent event) {
|
||||
if (event.getSpecVersion() == SpecVersion.V1) {
|
||||
CloudEventUtils.toVisitable(event).readAttributes(this);
|
||||
CloudEventUtils.toContextReader(event).readAttributes(this);
|
||||
} else {
|
||||
CloudEventUtils.toVisitable(event).readAttributes(new V03ToV1AttributesConverter(this));
|
||||
CloudEventUtils.toContextReader(event).readAttributes(new V03ToV1AttributesConverter(this));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.time.OffsetDateTime;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventWriterFactory<MockBinaryMessageWriter, MockBinaryMessageWriter>, CloudEventWriter<MockBinaryMessageWriter> {
|
||||
public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventContextReader, CloudEventWriterFactory<MockBinaryMessageWriter, MockBinaryMessageWriter>, CloudEventWriter<MockBinaryMessageWriter> {
|
||||
|
||||
private SpecVersion version;
|
||||
private Map<String, Object> attributes;
|
||||
|
|
|
@ -154,16 +154,6 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException {
|
||||
// no-op no need for that
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException {
|
||||
// no-op no need for that
|
||||
}
|
||||
|
||||
private String getStringNode(ObjectNode objNode, JsonParser p, String attributeName) throws JsonProcessingException {
|
||||
String val = getOptionalStringNode(objNode, p, attributeName);
|
||||
if (val == null) {
|
||||
|
|
|
@ -24,9 +24,9 @@ import io.cloudevents.CloudEvent;
|
|||
import io.cloudevents.CloudEventData;
|
||||
import io.cloudevents.core.impl.CloudEventUtils;
|
||||
import io.cloudevents.rw.CloudEventAttributesWriter;
|
||||
import io.cloudevents.rw.CloudEventContextReader;
|
||||
import io.cloudevents.rw.CloudEventExtensionsWriter;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
import io.cloudevents.rw.CloudEventReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -104,7 +104,7 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
|
|||
|
||||
// Serialize attributes
|
||||
try {
|
||||
CloudEventReader visitable = CloudEventUtils.toVisitable(value);
|
||||
CloudEventContextReader visitable = CloudEventUtils.toContextReader(value);
|
||||
FieldsSerializer serializer = new FieldsSerializer(gen, provider);
|
||||
visitable.readAttributes(serializer);
|
||||
visitable.readExtensions(serializer);
|
||||
|
|
|
@ -145,7 +145,7 @@ public final class HttpMessageFactory {
|
|||
* @param sendBody a function that sends body (e.g. sets HTTP status code, content-length and writes the bytes into output stream).
|
||||
* @return a message writer
|
||||
*/
|
||||
public static MessageWriter createWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> sendBody) {
|
||||
public static HttpMessageWriter createWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> sendBody) {
|
||||
return new HttpMessageWriter(putHeader, sendBody);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ import io.cloudevents.core.message.Encoding;
|
|||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
|
||||
import io.cloudevents.core.mock.CSVFormat;
|
||||
import io.cloudevents.http.HttpMessageFactory;
|
||||
import io.cloudevents.http.impl.HttpMessageWriter;
|
||||
import io.cloudevents.types.Time;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
|
@ -46,14 +45,13 @@ public class HttpMessageReaderWriterTest {
|
|||
|
||||
final AtomicReference<byte[]> body = new AtomicReference<>();
|
||||
final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
|
||||
GenericStructuredMessageReader.from(event, CSVFormat.INSTANCE).visit(
|
||||
GenericStructuredMessageReader.from(event, CSVFormat.INSTANCE).read(
|
||||
HttpMessageFactory.createWriter(headers::put, body::set)
|
||||
);
|
||||
assertThat(headers.get("content-type"))
|
||||
.isEqualTo(CSVFormat.INSTANCE.serializedContentType());
|
||||
assertThat(body.get())
|
||||
.isEqualTo(CSVFormat.INSTANCE.serialize(event));
|
||||
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -43,6 +43,6 @@ public class CloudEventMessageSerializer implements Serializer<MessageReader> {
|
|||
|
||||
@Override
|
||||
public byte[] serialize(String topic, Headers headers, MessageReader data) {
|
||||
return data.visit(new KafkaSerializerMessageWriterImpl(headers));
|
||||
return data.read(new KafkaSerializerMessageWriterImpl(headers));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue