Simplify the Reader/Writer implementations, reducing the knowledge of spec details (#309)

* Messing up stuff

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Collapse CloudEventAttributesWriter and CloudEventAttributesWriter into CloudEventContextWriter

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Rebase fix

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2020-12-01 17:27:50 +01:00 committed by GitHub
parent a14f5eabec
commit b89f45265b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 759 additions and 564 deletions

View File

@ -17,22 +17,20 @@
package io.cloudevents.amqp.impl; package io.cloudevents.amqp.impl;
import java.util.HashMap;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import io.cloudevents.CloudEventData; import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.rw.CloudEventAttributesWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventExtensionsWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter; import io.cloudevents.rw.CloudEventWriter;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import java.util.HashMap;
/** /**
* A proton-based MessageWriter capable of writing both structured and binary CloudEvent messages to an AMQP 1.0 representation as * A proton-based MessageWriter capable of writing both structured and binary CloudEvent messages to an AMQP 1.0 representation as
@ -53,33 +51,28 @@ public final class ProtonAmqpMessageWriter<R> implements MessageWriter<CloudEven
} }
@Override @Override
public CloudEventAttributesWriter withAttribute(final String name, final String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
if (name.equals(CloudEventV1.DATACONTENTTYPE)) { if (name.equals(CloudEventV1.DATACONTENTTYPE)) {
message.setContentType(value); message.setContentType(value);
} else { } else {
// for now, extensions are mapped to application-properties
// see https://github.com/cloudevents/sdk-java/issues/30#issuecomment-723982190
if (applicationProperties == null) { if (applicationProperties == null) {
throw new IllegalStateException("This Writer is not initialized"); throw new IllegalStateException("This Writer is not initialized");
} }
applicationProperties.getValue().put(AmqpConstants.ATTRIBUTES_TO_PROPERTYNAMES.get(name), value); String propName = AmqpConstants.ATTRIBUTES_TO_PROPERTYNAMES.get(name);
if (propName == null) {
propName = name;
}
applicationProperties.getValue().put(propName, value);
} }
return null; return this;
}
@Override
public CloudEventExtensionsWriter withExtension(final String name, final String value) throws CloudEventRWException {
// for now, extensions are mapped to application-properties
// see https://github.com/cloudevents/sdk-java/issues/30#issuecomment-723982190
if (applicationProperties == null) {
throw new IllegalStateException("This Writer is not initialized");
}
applicationProperties.getValue().put(name, value);
return null;
} }
@Override @Override
public ProtonAmqpMessageWriter<R> create(final SpecVersion version) { public ProtonAmqpMessageWriter<R> create(final SpecVersion version) {
if (applicationProperties == null) { if (applicationProperties == null) {
applicationProperties = new ApplicationProperties(new HashMap<String, Object>()); applicationProperties = new ApplicationProperties(new HashMap<>());
} }
applicationProperties.getValue().put(AmqpConstants.APP_PROPERTY_SPEC_VERSION, version.toString()); applicationProperties.getValue().put(AmqpConstants.APP_PROPERTY_SPEC_VERSION, version.toString());
return this; return this;
@ -105,5 +98,4 @@ public final class ProtonAmqpMessageWriter<R> implements MessageWriter<CloudEven
message.setApplicationProperties(applicationProperties); message.setApplicationProperties(applicationProperties);
return message; return message;
} }
} }

View File

@ -1,65 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rw;
import io.cloudevents.types.Time;
import java.net.URI;
import java.time.OffsetDateTime;
/**
* Interface to write the attributes from a {@link io.cloudevents.rw.CloudEventReader} to a new representation.
*/
public interface CloudEventAttributesWriter {
/**
* Set attribute with type {@link String}. This setter should not be invoked for specversion, because the built Visitor already
* has the information through the {@link CloudEventWriterFactory}.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this attribute.
*/
CloudEventAttributesWriter withAttribute(String name, String value) throws CloudEventRWException;
/**
* Set attribute with type {@link URI}.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this attribute.
*/
default CloudEventAttributesWriter withAttribute(String name, URI value) throws CloudEventRWException {
return withAttribute(name, value == null ? null : value.toString());
}
/**
* Set attribute with type {@link OffsetDateTime} attribute.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this attribute.
*/
default CloudEventAttributesWriter withAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
return withAttribute(name, value == null ? null : Time.writeTime(name, value));
}
}

View File

@ -28,19 +28,11 @@ import javax.annotation.ParametersAreNonnullByDefault;
public interface CloudEventContextReader { public interface CloudEventContextReader {
/** /**
* Visit self attributes using the provided writer * Read the context attributes and extensions using the provided writer
* *
* @param writer Attributes writer * @param writer context writer
* @throws CloudEventRWException if something went wrong during the visit. * @throws CloudEventRWException if something went wrong during the read.
*/ */
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException; void readContext(CloudEventContextWriter writer) throws CloudEventRWException;
/**
* Visit self extensions using the provided writer
*
* @param visitor Extensions writer
* @throws CloudEventRWException if something went wrong during the visit.
*/
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException;
} }

View File

@ -0,0 +1,105 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rw;
import io.cloudevents.types.Time;
import javax.annotation.ParametersAreNonnullByDefault;
import java.net.URI;
import java.time.OffsetDateTime;
/**
* Interface to write the context attributes/extensions from a {@link io.cloudevents.rw.CloudEventContextReader} to a new representation.
*/
@ParametersAreNonnullByDefault
public interface CloudEventContextWriter {
/**
* Set attribute with type {@link String}.
* This setter should not be invoked for specversion, because the writer should
* already know the specversion or because it doesn't need it to correctly write the value.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this attribute.
* @throws IllegalArgumentException if you're trying to set the specversion attribute.
*/
CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException;
/**
* Set attribute with type {@link URI}.
* This setter should not be invoked for specversion, because the writer should
* already know the specversion or because it doesn't need it to correctly write the value.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this attribute.
* @throws IllegalArgumentException if you're trying to set the specversion attribute.
*/
default CloudEventContextWriter withContextAttribute(String name, URI value) throws CloudEventRWException {
return withContextAttribute(name, value.toString());
}
/**
* Set attribute with type {@link OffsetDateTime} attribute.
* This setter should not be invoked for specversion, because the writer should
* already know the specversion or because it doesn't need it to correctly write the value.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this attribute.
* @throws IllegalArgumentException if you're trying to set the specversion attribute.
*/
default CloudEventContextWriter withContextAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
return withContextAttribute(name, Time.writeTime(name, value));
}
/**
* Set attribute with type {@link URI}.
* This setter should not be invoked for specversion, because the writer should
* already know the specversion or because it doesn't need it to correctly write the value.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this extension.
* @throws IllegalArgumentException if you're trying to set the specversion attribute.
*/
default CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
return withContextAttribute(name, value.toString());
}
/**
* Set attribute with type {@link Boolean} attribute.
* This setter should not be invoked for specversion, because the writer should
* already know the specversion or because it doesn't need it to correctly write the value.
*
* @param name name of the attribute
* @param value value of the attribute
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this extension.
* @throws IllegalArgumentException if you're trying to set the specversion attribute.
*/
default CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
return withContextAttribute(name, value.toString());
}
}

View File

@ -1,61 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rw;
import java.net.URI;
/**
* Interface to write the extensions from a {@link io.cloudevents.rw.CloudEventReader} to a new representation.
*/
public interface CloudEventExtensionsWriter {
/**
* Set an extension with type {@link String}.
*
* @param name name of the extension
* @param value value of the extension
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this extension.
*/
CloudEventExtensionsWriter withExtension(String name, String value) throws CloudEventRWException;
/**
* Set attribute with type {@link URI}.
*
* @param name name of the extension
* @param value value of the extension
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this extension.
*/
default CloudEventExtensionsWriter withExtension(String name, Number value) throws CloudEventRWException {
return withExtension(name, value == null ? null : value.toString());
}
/**
* Set attribute with type {@link Boolean} attribute.
*
* @param name name of the extension
* @param value value of the extension
* @return self
* @throws CloudEventRWException if anything goes wrong while writing this extension.
*/
default CloudEventExtensionsWriter withExtension(String name, Boolean value) throws CloudEventRWException {
return withExtension(name, value == null ? null : value.toString());
}
}

View File

@ -25,7 +25,7 @@ import io.cloudevents.CloudEventData;
* *
* @param <R> return value at the end of the write process * @param <R> return value at the end of the write process
*/ */
public interface CloudEventWriter<R> extends CloudEventAttributesWriter, CloudEventExtensionsWriter { public interface CloudEventWriter<R> extends CloudEventContextWriter {
/** /**
* End the visit with a data field * End the visit with a data field

View File

@ -17,18 +17,13 @@
package io.cloudevents.core.builder; package io.cloudevents.core.builder;
import java.net.URI; import io.cloudevents.*;
import java.time.OffsetDateTime; import io.cloudevents.rw.CloudEventWriter;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNullableByDefault; import javax.annotation.ParametersAreNullableByDefault;
import java.net.URI;
import io.cloudevents.CloudEvent; import java.time.OffsetDateTime;
import io.cloudevents.CloudEventContext;
import io.cloudevents.CloudEventData;
import io.cloudevents.Extension;
import io.cloudevents.SpecVersion;
import io.cloudevents.rw.CloudEventWriter;
/** /**
* Builder interface to build a {@link CloudEvent}. * Builder interface to build a {@link CloudEvent}.
@ -153,7 +148,6 @@ public interface CloudEventBuilder extends CloudEventWriter<CloudEvent> {
* @param value value of the extension attribute * @param value value of the extension attribute
* @return self * @return self
*/ */
@Override
CloudEventBuilder withExtension(@Nonnull String key, @Nonnull String value); CloudEventBuilder withExtension(@Nonnull String key, @Nonnull String value);
/** /**
@ -163,7 +157,6 @@ public interface CloudEventBuilder extends CloudEventWriter<CloudEvent> {
* @param value value of the extension attribute * @param value value of the extension attribute
* @return self * @return self
*/ */
@Override
CloudEventBuilder withExtension(@Nonnull String key, @Nonnull Number value); CloudEventBuilder withExtension(@Nonnull String key, @Nonnull Number value);
/** /**
@ -173,9 +166,26 @@ public interface CloudEventBuilder extends CloudEventWriter<CloudEvent> {
* @param value value of the extension attribute * @param value value of the extension attribute
* @return self * @return self
*/ */
@Override
CloudEventBuilder withExtension(@Nonnull String key, @Nonnull Boolean value); CloudEventBuilder withExtension(@Nonnull String key, @Nonnull Boolean value);
/**
* Set an extension with provided key and uri value
*
* @param key key of the extension attribute
* @param value value of the extension attribute
* @return self
*/
CloudEventBuilder withExtension(@Nonnull String key, @Nonnull URI value);
/**
* Set an extension with provided key and boolean value
*
* @param key key of the extension attribute
* @param value value of the extension attribute
* @return self
*/
CloudEventBuilder withExtension(@Nonnull String key, @Nonnull OffsetDateTime value);
/** /**
* Add to the builder all the extension key/values of the provided extension * Add to the builder all the extension key/values of the provided extension
* *

View File

@ -21,6 +21,8 @@ import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData; import io.cloudevents.CloudEventData;
import io.cloudevents.rw.*; import io.cloudevents.rw.*;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -52,26 +54,29 @@ public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader, Cl
@Override @Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException { public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.getSpecVersion()); CloudEventWriter<V> writer = writerFactory.create(this.getSpecVersion());
this.readAttributes(visitor); this.readContext(writer);
this.readExtensions(visitor);
if (this.data != null) { if (this.data != null) {
return visitor.end(mapper.map(this.data)); return writer.end(mapper.map(this.data));
} }
return visitor.end(); return writer.end();
} }
public void readExtensions(CloudEventExtensionsWriter writer) throws CloudEventRWException { protected void readExtensions(CloudEventContextWriter writer) throws CloudEventRWException {
// TODO to be improved // TODO to be improved
for (Map.Entry<String, Object> entry : this.extensions.entrySet()) { for (Map.Entry<String, Object> entry : this.extensions.entrySet()) {
if (entry.getValue() instanceof String) { if (entry.getValue() instanceof String) {
writer.withExtension(entry.getKey(), (String) entry.getValue()); writer.withContextAttribute(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue() instanceof Number) { } else if (entry.getValue() instanceof Number) {
writer.withExtension(entry.getKey(), (Number) entry.getValue()); writer.withContextAttribute(entry.getKey(), (Number) entry.getValue());
} else if (entry.getValue() instanceof Boolean) { } else if (entry.getValue() instanceof Boolean) {
writer.withExtension(entry.getKey(), (Boolean) entry.getValue()); writer.withContextAttribute(entry.getKey(), (Boolean) entry.getValue());
} else if (entry.getValue() instanceof URI) {
writer.withContextAttribute(entry.getKey(), (URI) entry.getValue());
} else if (entry.getValue() instanceof OffsetDateTime) {
writer.withContextAttribute(entry.getKey(), (OffsetDateTime) entry.getValue());
} else { } else {
// This should never happen because we build that map only through our builders // This should never happen because we build that map only through our builders
throw new IllegalStateException("Illegal value inside extensions map: " + entry); throw new IllegalStateException("Illegal value inside extensions map: " + entry);

View File

@ -17,13 +17,6 @@
package io.cloudevents.core.impl; package io.cloudevents.core.impl;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import io.cloudevents.CloudEvent; import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventContext; import io.cloudevents.CloudEventContext;
import io.cloudevents.CloudEventData; import io.cloudevents.CloudEventData;
@ -32,6 +25,14 @@ import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData; import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import javax.annotation.Nonnull;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import static io.cloudevents.core.v03.CloudEventV03.SPECVERSION;
public abstract class BaseCloudEventBuilder<SELF extends BaseCloudEventBuilder<SELF, T>, T extends CloudEvent> implements CloudEventBuilder { public abstract class BaseCloudEventBuilder<SELF extends BaseCloudEventBuilder<SELF, T>, T extends CloudEvent> implements CloudEventBuilder {
// This is a little trick for enabling fluency // This is a little trick for enabling fluency
@ -121,6 +122,24 @@ public abstract class BaseCloudEventBuilder<SELF extends BaseCloudEventBuilder<S
return self; return self;
} }
@Override
public SELF withExtension(@Nonnull String key, @Nonnull URI value) {
if (!isValidExtensionName(key)) {
throw CloudEventRWException.newInvalidExtensionName(key);
}
this.extensions.put(key, value);
return self;
}
@Override
public SELF withExtension(@Nonnull String key, @Nonnull OffsetDateTime value) {
if (!isValidExtensionName(key)) {
throw CloudEventRWException.newInvalidExtensionName(key);
}
this.extensions.put(key, value);
return self;
}
@Override @Override
public SELF withoutExtension(@Nonnull String key) { public SELF withoutExtension(@Nonnull String key) {
this.extensions.remove(key); this.extensions.remove(key);
@ -172,14 +191,20 @@ public abstract class BaseCloudEventBuilder<SELF extends BaseCloudEventBuilder<S
return false; return false;
} }
char[] chars = name.toCharArray(); char[] chars = name.toCharArray();
for (char c: chars) for (char c : chars)
if (!isValidChar(c)) { if (!isValidChar(c)) {
return false; return false;
} }
return true; return true;
} }
private static boolean isValidChar(char c) { private static boolean isValidChar(char c) {
return (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9'); return (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9');
} }
protected void requireValidAttributeWrite(String name) {
if (name.equals(SPECVERSION)) {
throw new IllegalArgumentException("You should not set the specversion attribute through withContextAttribute methods");
}
}
} }

View File

@ -18,9 +18,12 @@
package io.cloudevents.core.impl; package io.cloudevents.core.impl;
import io.cloudevents.CloudEventContext; import io.cloudevents.CloudEventContext;
import io.cloudevents.rw.CloudEventAttributesWriter;
import io.cloudevents.rw.CloudEventContextReader; import io.cloudevents.rw.CloudEventContextReader;
import io.cloudevents.rw.CloudEventExtensionsWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import java.net.URI;
import java.time.OffsetDateTime;
public class CloudEventContextReaderAdapter implements CloudEventContextReader { public class CloudEventContextReaderAdapter implements CloudEventContextReader {
@ -30,41 +33,47 @@ public class CloudEventContextReaderAdapter implements CloudEventContextReader {
this.event = event; this.event = event;
} }
@Override public void readAttributes(CloudEventContextWriter writer) throws RuntimeException {
public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException { writer.withContextAttribute("id", event.getId());
writer.withAttribute("id", event.getId()); writer.withContextAttribute("source", event.getSource());
writer.withAttribute("source", event.getSource()); writer.withContextAttribute("type", event.getType());
writer.withAttribute("type", event.getType());
if (event.getDataContentType() != null) { if (event.getDataContentType() != null) {
writer.withAttribute("datacontenttype", event.getDataContentType()); writer.withContextAttribute("datacontenttype", event.getDataContentType());
} }
if (event.getDataSchema() != null) { if (event.getDataSchema() != null) {
writer.withAttribute("dataschema", event.getDataSchema()); writer.withContextAttribute("dataschema", event.getDataSchema());
} }
if (event.getSubject() != null) { if (event.getSubject() != null) {
writer.withAttribute("subject", event.getSubject()); writer.withContextAttribute("subject", event.getSubject());
} }
if (event.getTime() != null) { if (event.getTime() != null) {
writer.withAttribute("time", event.getTime()); writer.withContextAttribute("time", event.getTime());
} }
} }
@Override public void readExtensions(CloudEventContextWriter writer) throws RuntimeException {
public void readExtensions(CloudEventExtensionsWriter writer) throws RuntimeException {
for (String key : event.getExtensionNames()) { for (String key : event.getExtensionNames()) {
Object value = event.getExtension(key); Object value = event.getExtension(key);
if (value instanceof String) { if (value instanceof String) {
writer.withExtension(key, (String) value); writer.withContextAttribute(key, (String) value);
} else if (value instanceof Number) { } else if (value instanceof Number) {
writer.withExtension(key, (Number) value); writer.withContextAttribute(key, (Number) value);
} else if (value instanceof Boolean) { } else if (value instanceof Boolean) {
writer.withExtension(key, (Boolean) value); writer.withContextAttribute(key, (Boolean) value);
} else if (value instanceof URI) {
writer.withContextAttribute(key, (URI) value);
} else if (value instanceof OffsetDateTime) {
writer.withContextAttribute(key, (OffsetDateTime) value);
} else { } else {
// This should never happen because we build that map only through our builders // This should never happen because we build that map only through our builders
throw new IllegalStateException("Illegal value inside extensions map: " + key + " " + value); throw new IllegalStateException("Illegal value inside extensions map: " + key + " " + value);
} }
} }
;
} }
@Override
public void readContext(CloudEventContextWriter writer) throws CloudEventRWException {
this.readAttributes(writer);
this.readExtensions(writer);
}
} }

View File

@ -57,17 +57,13 @@ public abstract class BaseGenericBinaryMessageReaderImpl<HK, HV> extends BaseBin
// in order to complete the visit in one loop // in order to complete the visit in one loop
this.forEachHeader((key, value) -> { this.forEachHeader((key, value) -> {
if (isContentTypeHeader(key)) { if (isContentTypeHeader(key)) {
visitor.withAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value)); visitor.withContextAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
} else if (isCloudEventsHeader(key)) { } else if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key); String name = toCloudEventsKey(key);
if (name.equals(CloudEventV1.SPECVERSION)) { if (name.equals(CloudEventV1.SPECVERSION)) {
return; return;
} }
if (this.version.getAllAttributes().contains(name)) { visitor.withContextAttribute(name, toCloudEventsValue(value));
visitor.withAttribute(name, toCloudEventsValue(value));
} else {
visitor.withExtension(name, toCloudEventsValue(value));
}
} }
}); });

View File

@ -16,17 +16,20 @@
*/ */
package io.cloudevents.core.v03; package io.cloudevents.core.v03;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.OffsetDateTime;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.impl.BaseCloudEventBuilder; import io.cloudevents.core.impl.BaseCloudEventBuilder;
import io.cloudevents.rw.CloudEventContextReader; import io.cloudevents.rw.CloudEventContextReader;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.types.Time; import io.cloudevents.types.Time;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.OffsetDateTime;
import static io.cloudevents.core.v03.CloudEventV03.*;
/** /**
* CloudEvent V0.3 builder. * CloudEvent V0.3 builder.
* *
@ -59,11 +62,10 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
protected void setAttributes(io.cloudevents.CloudEventContext event) { protected void setAttributes(io.cloudevents.CloudEventContext event) {
CloudEventContextReader contextReader = CloudEventUtils.toContextReader(event); CloudEventContextReader contextReader = CloudEventUtils.toContextReader(event);
if (event.getSpecVersion() == SpecVersion.V03) { if (event.getSpecVersion() == SpecVersion.V03) {
contextReader.readAttributes(this); contextReader.readContext(this);
} else { } else {
contextReader.readAttributes(new V1ToV03AttributesConverter(this)); contextReader.readContext(new V1ToV03AttributesConverter(this));
} }
contextReader.readExtensions(this);
} }
public CloudEventBuilder withId(String id) { public CloudEventBuilder withId(String id) {
@ -139,65 +141,127 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
} }
// Message impl // Message impl
@Override @Override
public CloudEventBuilder withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) { switch (name) {
case "id": case ID:
withId(value); withId(value);
return this; return this;
case "source": case SOURCE:
try { try {
withSource(new URI(value)); withSource(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("source", value, e); throw CloudEventRWException.newInvalidAttributeValue(SOURCE, value, e);
} }
return this; return this;
case "type": case TYPE:
withType(value); withType(value);
return this; return this;
case "datacontenttype": case DATACONTENTTYPE:
withDataContentType(value); withDataContentType(value);
return this; return this;
case "datacontentencoding": case DATACONTENTENCODING:
// No-op, this information is not saved in the event because it's useful only for parsing // No-op, this information is not saved in the event because it's useful only for parsing
return this; return this;
case "schemaurl": case SCHEMAURL:
try { try {
withSchemaUrl(new URI(value)); withSchemaUrl(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("schemaurl", value, e); throw CloudEventRWException.newInvalidAttributeValue(SCHEMAURL, value, e);
} }
return this; return this;
case "subject": case SUBJECT:
withSubject(value); withSubject(value);
return this; return this;
case "time": case TIME:
withTime(Time.parseTime("time", value)); withTime(Time.parseTime(TIME, value));
return this;
default:
withExtension(name, value);
return this; return this;
} }
throw CloudEventRWException.newInvalidAttributeName(name);
} }
@Override @Override
public CloudEventBuilder withAttribute(String name, URI value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, URI value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) { switch (name) {
case "source": case SOURCE:
withSource(value); withSource(value);
return this; return this;
case "schemaurl": case SCHEMAURL:
withDataSchema(value); withDataSchema(value);
return this; return this;
case ID:
case TYPE:
case DATACONTENTTYPE:
case DATACONTENTENCODING:
case SUBJECT:
case TIME:
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
default:
withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
} }
@Override @Override
public CloudEventBuilder withAttribute(String name, OffsetDateTime value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
if ("time".equals(name)) { requireValidAttributeWrite(name);
withTime(value); switch (name) {
return this; case TIME:
withTime(value);
return this;
case SCHEMAURL:
case ID:
case TYPE:
case DATACONTENTTYPE:
case DATACONTENTENCODING:
case SUBJECT:
case SOURCE:
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
default:
withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) {
case TIME:
case SCHEMAURL:
case ID:
case TYPE:
case DATACONTENTTYPE:
case DATACONTENTENCODING:
case SUBJECT:
case SOURCE:
throw CloudEventRWException.newInvalidAttributeType(name, Number.class);
default:
withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) {
case TIME:
case SCHEMAURL:
case ID:
case TYPE:
case DATACONTENTTYPE:
case DATACONTENTENCODING:
case SUBJECT:
case SOURCE:
throw CloudEventRWException.newInvalidAttributeType(name, Boolean.class);
default:
withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
} }
} }

View File

@ -20,7 +20,7 @@ import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.impl.BaseCloudEvent; import io.cloudevents.core.impl.BaseCloudEvent;
import io.cloudevents.lang.Nullable; import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventAttributesWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import java.net.URI; import java.net.URI;
@ -169,43 +169,44 @@ public final class CloudEventV03 extends BaseCloudEvent {
} }
@Override @Override
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException { public void readContext(CloudEventContextWriter writer) throws CloudEventRWException {
writer.withAttribute( writer.withContextAttribute(
ID, ID,
this.id this.id
); );
writer.withAttribute( writer.withContextAttribute(
SOURCE, SOURCE,
this.source this.source
); );
writer.withAttribute( writer.withContextAttribute(
TYPE, TYPE,
this.type this.type
); );
if (this.datacontenttype != null) { if (this.datacontenttype != null) {
writer.withAttribute( writer.withContextAttribute(
DATACONTENTTYPE, DATACONTENTTYPE,
this.datacontenttype this.datacontenttype
); );
} }
if (this.schemaurl != null) { if (this.schemaurl != null) {
writer.withAttribute( writer.withContextAttribute(
SCHEMAURL, SCHEMAURL,
this.schemaurl this.schemaurl
); );
} }
if (this.subject != null) { if (this.subject != null) {
writer.withAttribute( writer.withContextAttribute(
SUBJECT, SUBJECT,
this.subject this.subject
); );
} }
if (this.time != null) { if (this.time != null) {
writer.withAttribute( writer.withContextAttribute(
TIME, TIME,
this.time this.time
); );
} }
this.readExtensions(writer);
} }
@Override @Override

View File

@ -17,7 +17,7 @@
package io.cloudevents.core.v03; package io.cloudevents.core.v03;
import io.cloudevents.rw.CloudEventAttributesWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.types.Time; import io.cloudevents.types.Time;
@ -25,7 +25,9 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
class V1ToV03AttributesConverter implements CloudEventAttributesWriter { import static io.cloudevents.core.v1.CloudEventV1.*;
class V1ToV03AttributesConverter implements CloudEventContextWriter {
private final CloudEventBuilder builder; private final CloudEventBuilder builder;
@ -34,60 +36,114 @@ class V1ToV03AttributesConverter implements CloudEventAttributesWriter {
} }
@Override @Override
public V1ToV03AttributesConverter withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
switch (name) { switch (name) {
case "id": case ID:
builder.withId(value); builder.withId(value);
return this; return this;
case "source": case SOURCE:
try { try {
builder.withSource(new URI(value)); builder.withSource(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("source", value, e); throw CloudEventRWException.newInvalidAttributeValue(SOURCE, value, e);
} }
return this; return this;
case "type": case TYPE:
builder.withType(value); builder.withType(value);
return this; return this;
case "datacontenttype": case DATACONTENTTYPE:
builder.withDataContentType(value); builder.withDataContentType(value);
return this; return this;
case "dataschema": case DATASCHEMA:
try { try {
builder.withSchemaUrl(new URI(value)); builder.withSchemaUrl(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("dataschema", value, e); throw CloudEventRWException.newInvalidAttributeValue(DATASCHEMA, value, e);
} }
return this; return this;
case "subject": case SUBJECT:
builder.withSubject(value); builder.withSubject(value);
return this; return this;
case "time": case TIME:
builder.withTime(Time.parseTime("time", value)); builder.withTime(Time.parseTime(TIME, value));
return this;
default:
builder.withExtension(name, value);
return this; return this;
} }
throw CloudEventRWException.newInvalidAttributeName(name);
} }
@Override @Override
public V1ToV03AttributesConverter withAttribute(String name, URI value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, URI value) throws CloudEventRWException {
switch (name) { switch (name) {
case "source": case SOURCE:
builder.withSource(value); builder.withSource(value);
return this; return this;
case "dataschema": case DATASCHEMA:
builder.withSchemaUrl(value); builder.withSchemaUrl(value);
return this; return this;
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
case TIME:
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
default:
builder.withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
} }
@Override @Override
public V1ToV03AttributesConverter withAttribute(String name, OffsetDateTime value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
if ("time".equals(name)) { switch (name) {
builder.withTime(value); case TIME:
return this; builder.withTime(value);
return this;
case SOURCE:
case DATASCHEMA:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
default:
builder.withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
switch (name) {
case TIME:
case SOURCE:
case DATASCHEMA:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
throw CloudEventRWException.newInvalidAttributeType(name, Number.class);
default:
builder.withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
switch (name) {
case TIME:
case SOURCE:
case DATASCHEMA:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
throw CloudEventRWException.newInvalidAttributeType(name, Boolean.class);
default:
builder.withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
} }
} }

View File

@ -22,6 +22,7 @@ import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.impl.BaseCloudEventBuilder; import io.cloudevents.core.impl.BaseCloudEventBuilder;
import io.cloudevents.rw.CloudEventContextReader; import io.cloudevents.rw.CloudEventContextReader;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.types.Time; import io.cloudevents.types.Time;
@ -29,6 +30,8 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import static io.cloudevents.core.v1.CloudEventV1.*;
/** /**
* CloudEvent V1.0 builder. * CloudEvent V1.0 builder.
* *
@ -61,11 +64,10 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
protected void setAttributes(io.cloudevents.CloudEventContext event) { protected void setAttributes(io.cloudevents.CloudEventContext event) {
CloudEventContextReader contextReader = CloudEventUtils.toContextReader(event); CloudEventContextReader contextReader = CloudEventUtils.toContextReader(event);
if (event.getSpecVersion() == SpecVersion.V1) { if (event.getSpecVersion() == SpecVersion.V1) {
contextReader.readAttributes(this); contextReader.readContext(this);
} else { } else {
contextReader.readAttributes(new V03ToV1AttributesConverter(this)); contextReader.readContext(new V03ToV1AttributesConverter(this));
} }
contextReader.readExtensions(this);
} }
public CloudEventBuilder withId(String id) { public CloudEventBuilder withId(String id) {
@ -108,13 +110,13 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
@Override @Override
public CloudEvent build() { public CloudEvent build() {
if (id == null) { if (id == null) {
throw createMissingAttributeException(CloudEventV1.ID); throw createMissingAttributeException(ID);
} }
if (source == null) { if (source == null) {
throw createMissingAttributeException(CloudEventV1.SOURCE); throw createMissingAttributeException(SOURCE);
} }
if (type == null) { if (type == null) {
throw createMissingAttributeException(CloudEventV1.TYPE); throw createMissingAttributeException(TYPE);
} }
return new CloudEventV1(id, source, type, datacontenttype, dataschema, subject, time, this.data, this.extensions); return new CloudEventV1(id, source, type, datacontenttype, dataschema, subject, time, this.data, this.extensions);
@ -138,60 +140,119 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
// Message impl // Message impl
@Override @Override
public CloudEventBuilder withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) { switch (name) {
case CloudEventV1.ID: case ID:
withId(value); withId(value);
return this; return this;
case CloudEventV1.SOURCE: case SOURCE:
try { try {
withSource(new URI(value)); withSource(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue(CloudEventV1.SOURCE, value, e); throw CloudEventRWException.newInvalidAttributeValue(SOURCE, value, e);
} }
return this; return this;
case CloudEventV1.TYPE: case TYPE:
withType(value); withType(value);
return this; return this;
case CloudEventV1.DATACONTENTTYPE: case DATACONTENTTYPE:
withDataContentType(value); withDataContentType(value);
return this; return this;
case CloudEventV1.DATASCHEMA: case DATASCHEMA:
try { try {
withDataSchema(new URI(value)); withDataSchema(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue(CloudEventV1.DATASCHEMA, value, e); throw CloudEventRWException.newInvalidAttributeValue(DATASCHEMA, value, e);
} }
return this; return this;
case CloudEventV1.SUBJECT: case SUBJECT:
withSubject(value); withSubject(value);
return this; return this;
case CloudEventV1.TIME: case TIME:
withTime(Time.parseTime(CloudEventV1.TIME, value)); withTime(Time.parseTime(TIME, value));
return this;
default:
withExtension(name, value);
return this; return this;
} }
throw CloudEventRWException.newInvalidAttributeName(name);
} }
@Override @Override
public CloudEventBuilder withAttribute(String name, URI value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, URI value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) { switch (name) {
case CloudEventV1.SOURCE: case SOURCE:
withSource(value); withSource(value);
return this; return this;
case CloudEventV1.DATASCHEMA: case DATASCHEMA:
withDataSchema(value); withDataSchema(value);
return this; return this;
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
case TIME:
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
default:
withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
} }
@Override @Override
public CloudEventBuilder withAttribute(String name, OffsetDateTime value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
if (CloudEventV1.TIME.equals(name)) { requireValidAttributeWrite(name);
withTime(value); switch (name) {
return this; case TIME:
withTime(value);
return this;
case DATASCHEMA:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
case SOURCE:
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
default:
withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) {
case TIME:
case DATASCHEMA:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
case SOURCE:
throw CloudEventRWException.newInvalidAttributeType(name, Number.class);
default:
withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
requireValidAttributeWrite(name);
switch (name) {
case TIME:
case DATASCHEMA:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
case SOURCE:
throw CloudEventRWException.newInvalidAttributeType(name, Boolean.class);
default:
withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
} }
} }

View File

@ -19,7 +19,7 @@ package io.cloudevents.core.v1;
import io.cloudevents.CloudEventData; import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.impl.BaseCloudEvent; import io.cloudevents.core.impl.BaseCloudEvent;
import io.cloudevents.rw.CloudEventAttributesWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import java.net.URI; import java.net.URI;
@ -156,43 +156,44 @@ public final class CloudEventV1 extends BaseCloudEvent {
} }
@Override @Override
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException { public void readContext(CloudEventContextWriter writer) throws CloudEventRWException {
writer.withAttribute( writer.withContextAttribute(
ID, ID,
this.id this.id
); );
writer.withAttribute( writer.withContextAttribute(
SOURCE, SOURCE,
this.source this.source
); );
writer.withAttribute( writer.withContextAttribute(
TYPE, TYPE,
this.type this.type
); );
if (this.datacontenttype != null) { if (this.datacontenttype != null) {
writer.withAttribute( writer.withContextAttribute(
DATACONTENTTYPE, DATACONTENTTYPE,
this.datacontenttype this.datacontenttype
); );
} }
if (this.dataschema != null) { if (this.dataschema != null) {
writer.withAttribute( writer.withContextAttribute(
DATASCHEMA, DATASCHEMA,
this.dataschema this.dataschema
); );
} }
if (this.subject != null) { if (this.subject != null) {
writer.withAttribute( writer.withContextAttribute(
SUBJECT, SUBJECT,
this.subject this.subject
); );
} }
if (this.time != null) { if (this.time != null) {
writer.withAttribute( writer.withContextAttribute(
TIME, TIME,
this.time this.time
); );
} }
this.readExtensions(writer);
} }
@Override @Override

View File

@ -17,7 +17,7 @@
package io.cloudevents.core.v1; package io.cloudevents.core.v1;
import io.cloudevents.rw.CloudEventAttributesWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.types.Time; import io.cloudevents.types.Time;
@ -25,7 +25,9 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
class V03ToV1AttributesConverter implements CloudEventAttributesWriter { import static io.cloudevents.core.v03.CloudEventV03.*;
class V03ToV1AttributesConverter implements CloudEventContextWriter {
private final CloudEventBuilder builder; private final CloudEventBuilder builder;
@ -34,60 +36,114 @@ class V03ToV1AttributesConverter implements CloudEventAttributesWriter {
} }
@Override @Override
public V03ToV1AttributesConverter withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
switch (name) { switch (name) {
case "id": case ID:
builder.withId(value); builder.withId(value);
return this; return this;
case "source": case SOURCE:
try { try {
builder.withSource(new URI(value)); builder.withSource(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("source", value, e); throw CloudEventRWException.newInvalidAttributeValue(SOURCE, value, e);
} }
return this; return this;
case "type": case TYPE:
builder.withType(value); builder.withType(value);
return this; return this;
case "datacontenttype": case DATACONTENTTYPE:
builder.withDataContentType(value); builder.withDataContentType(value);
return this; return this;
case "schemaurl": case SCHEMAURL:
try { try {
builder.withDataSchema(new URI(value)); builder.withDataSchema(new URI(value));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("dataschema", value, e); throw CloudEventRWException.newInvalidAttributeValue(SCHEMAURL, value, e);
} }
return this; return this;
case "subject": case SUBJECT:
builder.withSubject(value); builder.withSubject(value);
return this; return this;
case "time": case TIME:
builder.withTime(Time.parseTime("time", value)); builder.withTime(Time.parseTime(TIME, value));
return this;
default:
builder.withExtension(name, value);
return this; return this;
} }
throw CloudEventRWException.newInvalidAttributeName(name);
} }
@Override @Override
public V03ToV1AttributesConverter withAttribute(String name, URI value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, URI value) throws CloudEventRWException {
switch (name) { switch (name) {
case "source": case SOURCE:
builder.withSource(value); builder.withSource(value);
return this; return this;
case "schemaurl": case SCHEMAURL:
builder.withDataSchema(value); builder.withDataSchema(value);
return this; return this;
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
case TIME:
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
default:
builder.withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, URI.class);
} }
@Override @Override
public V03ToV1AttributesConverter withAttribute(String name, OffsetDateTime value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
if ("time".equals(name)) { switch (name) {
builder.withTime(value); case TIME:
return this; builder.withTime(value);
return this;
case SOURCE:
case SCHEMAURL:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
default:
builder.withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
switch (name) {
case TIME:
case SOURCE:
case SCHEMAURL:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
throw CloudEventRWException.newInvalidAttributeType(name, Number.class);
default:
builder.withExtension(name, value);
return this;
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
switch (name) {
case TIME:
case SOURCE:
case SCHEMAURL:
case ID:
case TYPE:
case DATACONTENTTYPE:
case SUBJECT:
throw CloudEventRWException.newInvalidAttributeType(name, Boolean.class);
default:
builder.withExtension(name, value);
return this;
} }
throw CloudEventRWException.newInvalidAttributeType(name, OffsetDateTime.class);
} }
} }

View File

@ -34,24 +34,21 @@ import java.util.Map;
public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventContextReader, CloudEventWriterFactory<MockBinaryMessageWriter, MockBinaryMessageWriter>, CloudEventWriter<MockBinaryMessageWriter> { public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventContextReader, CloudEventWriterFactory<MockBinaryMessageWriter, MockBinaryMessageWriter>, CloudEventWriter<MockBinaryMessageWriter> {
private SpecVersion version; private SpecVersion version;
private Map<String, Object> attributes; private Map<String, Object> context;
private CloudEventData data; private CloudEventData data;
private Map<String, Object> extensions;
public MockBinaryMessageWriter(SpecVersion version, Map<String, Object> attributes, CloudEventData data, Map<String, Object> extensions) { public MockBinaryMessageWriter(SpecVersion version, Map<String, Object> context, CloudEventData data) {
this.version = version; this.version = version;
this.attributes = attributes; this.context = context;
this.data = data; this.data = data;
this.extensions = extensions;
} }
public MockBinaryMessageWriter(SpecVersion version, Map<String, Object> attributes, byte[] data, Map<String, Object> extensions) { public MockBinaryMessageWriter(SpecVersion version, Map<String, Object> context, byte[] data) {
this(version, attributes, BytesCloudEventData.wrap(data), extensions); this(version, context, BytesCloudEventData.wrap(data));
} }
public MockBinaryMessageWriter() { public MockBinaryMessageWriter() {
this.attributes = new HashMap<>(); this.context = new HashMap<>();
this.extensions = new HashMap<>();
} }
public MockBinaryMessageWriter(CloudEvent event) { public MockBinaryMessageWriter(CloudEvent event) {
@ -67,47 +64,14 @@ public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements
throw new IllegalStateException("MockBinaryMessage is empty"); throw new IllegalStateException("MockBinaryMessage is empty");
} }
CloudEventWriter<V> visitor = writerFactory.create(version); CloudEventWriter<V> writer = writerFactory.create(version);
this.readAttributes(visitor); this.readContext(writer);
this.readExtensions(visitor);
if (this.data != null) { if (this.data != null) {
return visitor.end(mapper.map(this.data)); return writer.end(mapper.map(this.data));
} }
return visitor.end(); return writer.end();
}
@Override
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException, IllegalStateException {
for (Map.Entry<String, Object> e : this.attributes.entrySet()) {
if (e.getValue() instanceof String) {
writer.withAttribute(e.getKey(), (String) e.getValue());
} else if (e.getValue() instanceof OffsetDateTime) {
writer.withAttribute(e.getKey(), (OffsetDateTime) e.getValue());
} else if (e.getValue() instanceof URI) {
writer.withAttribute(e.getKey(), (URI) e.getValue());
} else {
// This should never happen because we build that map only through our builders
throw new IllegalStateException("Illegal value inside attributes map: " + e);
}
}
}
@Override
public void readExtensions(CloudEventExtensionsWriter writer) throws CloudEventRWException, IllegalStateException {
for (Map.Entry<String, Object> entry : this.extensions.entrySet()) {
if (entry.getValue() instanceof String) {
writer.withExtension(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue() instanceof Number) {
writer.withExtension(entry.getKey(), (Number) entry.getValue());
} else if (entry.getValue() instanceof Boolean) {
writer.withExtension(entry.getKey(), (Boolean) entry.getValue());
} else {
// This should never happen because we build that map only through our builders
throw new IllegalStateException("Illegal value inside extensions map: " + entry);
}
}
} }
@Override @Override
@ -121,46 +85,60 @@ public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements
return this; return this;
} }
@Override
public MockBinaryMessageWriter withAttribute(String name, String value) throws CloudEventRWException {
this.attributes.put(name, value);
return this;
}
@Override
public MockBinaryMessageWriter withAttribute(String name, URI value) throws CloudEventRWException {
this.attributes.put(name, value);
return this;
}
@Override
public MockBinaryMessageWriter withAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
this.attributes.put(name, value);
return this;
}
@Override
public MockBinaryMessageWriter withExtension(String name, String value) throws CloudEventRWException {
this.extensions.put(name, value);
return this;
}
@Override
public MockBinaryMessageWriter withExtension(String name, Number value) throws CloudEventRWException {
this.extensions.put(name, value);
return this;
}
@Override
public MockBinaryMessageWriter withExtension(String name, Boolean value) throws CloudEventRWException {
this.extensions.put(name, value);
return this;
}
@Override @Override
public MockBinaryMessageWriter create(SpecVersion version) { public MockBinaryMessageWriter create(SpecVersion version) {
this.version = version; this.version = version;
return this; return this;
} }
@Override
public void readContext(CloudEventContextWriter writer) throws CloudEventRWException {
for (Map.Entry<String, Object> entry : this.context.entrySet()) {
if (entry.getValue() instanceof String) {
writer.withContextAttribute(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue() instanceof OffsetDateTime) {
writer.withContextAttribute(entry.getKey(), (OffsetDateTime) entry.getValue());
} else if (entry.getValue() instanceof URI) {
writer.withContextAttribute(entry.getKey(), (URI) entry.getValue());
} else if (entry.getValue() instanceof Number) {
writer.withContextAttribute(entry.getKey(), (Number) entry.getValue());
} else if (entry.getValue() instanceof Boolean) {
writer.withContextAttribute(entry.getKey(), (Boolean) entry.getValue());
} else {
// This should never happen because we build that map only through our builders
throw new IllegalStateException("Illegal value inside context map: " + entry);
}
}
}
@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
this.context.put(name, value);
return this;
}
@Override
public CloudEventContextWriter withContextAttribute(String name, URI value) throws CloudEventRWException {
this.context.put(name, value);
return this;
}
@Override
public CloudEventContextWriter withContextAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
this.context.put(name, value);
return this;
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
this.context.put(name, value);
return this;
}
@Override
public CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
this.context.put(name, value);
return this;
}
} }

View File

@ -1,26 +1,20 @@
package io.cloudevents.examples.amqp.vertx; package io.cloudevents.examples.amqp.vertx;
import java.io.PrintWriter;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.message.Message;
import io.cloudevents.CloudEvent; import io.cloudevents.CloudEvent;
import io.cloudevents.amqp.ProtonAmqpMessageFactory; import io.cloudevents.amqp.ProtonAmqpMessageFactory;
import io.cloudevents.core.message.MessageReader; import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.v1.CloudEventBuilder; import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.types.Time;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonClient; import io.vertx.proton.*;
import io.vertx.proton.ProtonClientOptions; import org.apache.qpid.proton.amqp.messaging.Accepted;
import io.vertx.proton.ProtonConnection; import org.apache.qpid.proton.message.Message;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS; import java.io.PrintWriter;
import io.vertx.proton.ProtonReceiver; import java.net.URI;
import io.vertx.proton.ProtonSender;
/** /**
* A example vertx-based AMQP client that interacts with a remote AMQP server to send and receive CloudEvent messages. * A example vertx-based AMQP client that interacts with a remote AMQP server to send and receive CloudEvent messages.
@ -73,10 +67,10 @@ public class AmqpClient {
final JsonObject payload = new JsonObject().put("temp", 50); final JsonObject payload = new JsonObject().put("temp", 50);
final CloudEvent event = new CloudEventBuilder() final CloudEvent event = new CloudEventBuilder()
.withAttribute(CloudEventV1.ID, "client-id") .withId("client-id")
.withAttribute(CloudEventV1.SOURCE, "http://127.0.0.1/amqp-client") .withSource(URI.create("http://127.0.0.1/amqp-client"))
.withAttribute(CloudEventV1.TYPE, "com.example.sampletype1") .withType("com.example.sampletype1")
.withAttribute(CloudEventV1.TIME, "2020-11-06T21:47:12.037467+00:00") .withTime(Time.parseTime("2020-11-06T21:47:12.037467+00:00"))
.withData(payload.toString().getBytes()) .withData(payload.toString().getBytes())
.build(); .build();
@ -96,16 +90,16 @@ public class AmqpClient {
private static void receiveMessage() { private static void receiveMessage() {
connectToServer(SERVER_HOST, SERVER_PORT) connectToServer(SERVER_HOST, SERVER_PORT)
.compose(conn -> { .compose(conn -> {
connection = conn; connection = conn;
writer.println("[Client] Connected"); writer.println("[Client] Connected");
return Future.succeededFuture(); return Future.succeededFuture();
}).onSuccess(success -> }).onSuccess(success ->
openReceiverLink((delivery, message) -> { openReceiverLink((delivery, message) -> {
final MessageReader reader = ProtonAmqpMessageFactory.createReader(message); final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
final CloudEvent event = reader.toEvent(); final CloudEvent event = reader.toEvent();
writer.printf("[Client] received CloudEvent[Id=%s, Source=%s]", event.getId(), writer.printf("[Client] received CloudEvent[Id=%s, Source=%s]", event.getId(),
event.getSource().toString()); event.getSource().toString());
connection.close(); connection.close();
}) })
).onFailure(t -> { ).onFailure(t -> {

View File

@ -57,19 +57,21 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException { public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
try { try {
SpecVersion specVersion = SpecVersion.parse(getStringNode(this.node, this.p, "specversion")); SpecVersion specVersion = SpecVersion.parse(getStringNode(this.node, this.p, "specversion"));
CloudEventWriter<V> visitor = writerFactory.create(specVersion); CloudEventWriter<V> writer = writerFactory.create(specVersion);
// TODO remove all the unnecessary code specversion aware
// Read mandatory attributes // Read mandatory attributes
for (String attr : specVersion.getMandatoryAttributes()) { for (String attr : specVersion.getMandatoryAttributes()) {
if (!"specversion".equals(attr)) { if (!"specversion".equals(attr)) {
visitor.withAttribute(attr, getStringNode(this.node, this.p, attr)); writer.withContextAttribute(attr, getStringNode(this.node, this.p, attr));
} }
} }
// Parse datacontenttype if any // Parse datacontenttype if any
String contentType = getOptionalStringNode(this.node, this.p, "datacontenttype"); String contentType = getOptionalStringNode(this.node, this.p, "datacontenttype");
if (contentType != null) { if (contentType != null) {
visitor.withAttribute("datacontenttype", contentType); writer.withContextAttribute("datacontenttype", contentType);
} }
// Read optional attributes // Read optional attributes
@ -77,7 +79,7 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
if (!"datacontentencoding".equals(attr)) { // Skip datacontentencoding, we need it later if (!"datacontentencoding".equals(attr)) { // Skip datacontentencoding, we need it later
String val = getOptionalStringNode(this.node, this.p, attr); String val = getOptionalStringNode(this.node, this.p, attr);
if (val != null) { if (val != null) {
visitor.withAttribute(attr, val); writer.withContextAttribute(attr, val);
} }
} }
} }
@ -129,24 +131,24 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
switch (extensionValue.getNodeType()) { switch (extensionValue.getNodeType()) {
case BOOLEAN: case BOOLEAN:
visitor.withExtension(extensionName, extensionValue.booleanValue()); writer.withContextAttribute(extensionName, extensionValue.booleanValue());
break; break;
case NUMBER: case NUMBER:
visitor.withExtension(extensionName, extensionValue.numberValue()); writer.withContextAttribute(extensionName, extensionValue.numberValue());
break; break;
case STRING: case STRING:
visitor.withExtension(extensionName, extensionValue.textValue()); writer.withContextAttribute(extensionName, extensionValue.textValue());
break; break;
default: default:
visitor.withExtension(extensionName, extensionValue.toString()); writer.withContextAttribute(extensionName, extensionValue.toString());
} }
}); });
if (data != null) { if (data != null) {
return visitor.end(mapper.map(data)); return writer.end(mapper.map(data));
} }
return visitor.end(); return writer.end();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {

View File

@ -23,9 +23,8 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.cloudevents.CloudEvent; import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData; import io.cloudevents.CloudEventData;
import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.rw.CloudEventAttributesWriter;
import io.cloudevents.rw.CloudEventContextReader; import io.cloudevents.rw.CloudEventContextReader;
import io.cloudevents.rw.CloudEventExtensionsWriter; import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import java.io.IOException; import java.io.IOException;
@ -45,18 +44,18 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
this.forceStringSerialization = forceStringSerialization; this.forceStringSerialization = forceStringSerialization;
} }
private static class FieldsSerializer implements CloudEventAttributesWriter, CloudEventExtensionsWriter { private static class JsonContextWriter implements CloudEventContextWriter {
private final JsonGenerator gen; private final JsonGenerator gen;
private final SerializerProvider provider; private final SerializerProvider provider;
public FieldsSerializer(JsonGenerator gen, SerializerProvider provider) { public JsonContextWriter(JsonGenerator gen, SerializerProvider provider) {
this.gen = gen; this.gen = gen;
this.provider = provider; this.provider = provider;
} }
@Override @Override
public FieldsSerializer withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
try { try {
gen.writeStringField(name, value); gen.writeStringField(name, value);
return this; return this;
@ -66,17 +65,7 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
} }
@Override @Override
public FieldsSerializer withExtension(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, Number value) throws CloudEventRWException {
try {
gen.writeStringField(name, value);
return this;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public FieldsSerializer withExtension(String name, Number value) throws CloudEventRWException {
try { try {
gen.writeFieldName(name); gen.writeFieldName(name);
provider.findValueSerializer(value.getClass()).serialize(value, gen, provider); provider.findValueSerializer(value.getClass()).serialize(value, gen, provider);
@ -87,7 +76,7 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
} }
@Override @Override
public FieldsSerializer withExtension(String name, Boolean value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, Boolean value) throws CloudEventRWException {
try { try {
gen.writeBooleanField(name, value); gen.writeBooleanField(name, value);
return this; return this;
@ -104,10 +93,9 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
// Serialize attributes // Serialize attributes
try { try {
CloudEventContextReader visitable = CloudEventUtils.toContextReader(value); CloudEventContextReader contextReader = CloudEventUtils.toContextReader(value);
FieldsSerializer serializer = new FieldsSerializer(gen, provider); JsonContextWriter contextWriter = new JsonContextWriter(gen, provider);
visitable.readAttributes(serializer); contextReader.readContext(contextWriter);
visitable.readExtensions(serializer);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw (IOException) e.getCause(); throw (IOException) e.getCause();
} }

View File

@ -20,6 +20,7 @@ import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter; import io.cloudevents.rw.CloudEventWriter;
@ -58,14 +59,12 @@ public class HttpMessageWriter implements CloudEventWriter<Void>, MessageWriter<
} }
@Override @Override
public HttpMessageWriter withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
putHeader.accept(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); String headerName = CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = "ce-" + name;
}
@Override putHeader.accept(headerName, value);
public HttpMessageWriter withExtension(String name, String value) throws CloudEventRWException {
putHeader.accept("ce-" + name, value);
return this; return this;
} }

View File

@ -47,14 +47,12 @@ public final class RestfulWSClientMessageWriter implements CloudEventWriter<Void
} }
@Override @Override
public RestfulWSClientMessageWriter withAttribute(String name, String value) throws CloudEventRWException { public RestfulWSClientMessageWriter withContextAttribute(String name, String value) throws CloudEventRWException {
this.context.getHeaders().add(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); String headerName = CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = CloudEventsHeaders.CE_PREFIX + name;
}
@Override this.context.getHeaders().add(headerName, value);
public RestfulWSClientMessageWriter withExtension(String name, String value) throws CloudEventRWException {
this.context.getHeaders().add(CloudEventsHeaders.CE_PREFIX + name, value);
return this; return this;
} }

View File

@ -49,14 +49,12 @@ public final class RestfulWSMessageWriter implements CloudEventWriter<Void>, Mes
} }
@Override @Override
public RestfulWSMessageWriter withAttribute(String name, String value) throws CloudEventRWException { public RestfulWSMessageWriter withContextAttribute(String name, String value) throws CloudEventRWException {
this.httpHeaders.add(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); String headerName = CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = CloudEventsHeaders.CE_PREFIX + name;
}
@Override this.httpHeaders.add(headerName, value);
public RestfulWSMessageWriter withExtension(String name, String value) throws CloudEventRWException {
this.httpHeaders.add(CloudEventsHeaders.CE_PREFIX + name, value);
return this; return this;
} }

View File

@ -46,14 +46,12 @@ public class VertxHttpServerResponseMessageWriterImpl implements MessageWriter<C
// Binary visitor // Binary visitor
@Override @Override
public VertxHttpServerResponseMessageWriterImpl withAttribute(String name, String value) throws CloudEventRWException { public VertxHttpServerResponseMessageWriterImpl withContextAttribute(String name, String value) throws CloudEventRWException {
this.response.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); CharSequence headerName = CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = "ce-" + name;
}
@Override this.response.putHeader(headerName, value);
public VertxHttpServerResponseMessageWriterImpl withExtension(String name, String value) throws CloudEventRWException {
this.response.putHeader("ce-" + name, value);
return this; return this;
} }

View File

@ -48,14 +48,12 @@ public class VertxWebClientRequestMessageWriterImpl implements MessageWriter<Clo
// Binary visitor // Binary visitor
@Override @Override
public VertxWebClientRequestMessageWriterImpl withAttribute(String name, String value) throws CloudEventRWException { public VertxWebClientRequestMessageWriterImpl withContextAttribute(String name, String value) throws CloudEventRWException {
this.request.headers().add(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); CharSequence headerName = CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = "ce-" + name;
}
@Override this.request.headers().add(headerName, value);
public VertxWebClientRequestMessageWriterImpl withExtension(String name, String value) throws CloudEventRWException {
this.request.headers().add("ce-" + name, value);
return this; return this;
} }

View File

@ -35,14 +35,12 @@ abstract class BaseKafkaMessageWriterImpl<R> implements MessageWriter<CloudEvent
} }
@Override @Override
public BaseKafkaMessageWriterImpl<R> withAttribute(String name, String value) throws CloudEventRWException { public BaseKafkaMessageWriterImpl<R> withContextAttribute(String name, String value) throws CloudEventRWException {
headers.add(new RecordHeader(KafkaHeaders.ATTRIBUTES_TO_HEADERS.get(name), value.getBytes())); String headerName = KafkaHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = KafkaHeaders.CE_PREFIX + name;
}
@Override headers.add(new RecordHeader(headerName, value.getBytes()));
public BaseKafkaMessageWriterImpl<R> withExtension(String name, String value) throws CloudEventRWException {
headers.add(new RecordHeader(KafkaHeaders.CE_PREFIX + name, value.getBytes()));
return this; return this;
} }

View File

@ -19,7 +19,6 @@ package io.cloudevents.kafka.impl;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.core.v1.CloudEventV1;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
@ -46,7 +45,7 @@ public final class KafkaProducerMessageWriterImpl<K>
@Override @Override
public KafkaProducerMessageWriterImpl<K> create(SpecVersion version) { public KafkaProducerMessageWriterImpl<K> create(SpecVersion version) {
this.withAttribute(CloudEventV1.SPECVERSION, version.toString()); this.withContextAttribute(CloudEventV1.SPECVERSION, version.toString());
return this; return this;
} }
} }

View File

@ -18,6 +18,7 @@
package io.cloudevents.kafka.impl; package io.cloudevents.kafka.impl;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.v1.CloudEventV1;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
public final class KafkaSerializerMessageWriterImpl extends BaseKafkaMessageWriterImpl<byte[]> { public final class KafkaSerializerMessageWriterImpl extends BaseKafkaMessageWriterImpl<byte[]> {
@ -28,7 +29,7 @@ public final class KafkaSerializerMessageWriterImpl extends BaseKafkaMessageWrit
@Override @Override
public KafkaSerializerMessageWriterImpl create(SpecVersion version) { public KafkaSerializerMessageWriterImpl create(SpecVersion version) {
this.withAttribute("specversion", version.toString()); this.withContextAttribute(CloudEventV1.SPECVERSION, version.toString());
return this; return this;
} }

View File

@ -15,34 +15,34 @@
*/ */
package io.cloudevents.spring.webflux; package io.cloudevents.spring.webflux;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import io.cloudevents.CloudEvent; import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData; import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion; import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter; import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter; import io.cloudevents.rw.CloudEventWriter;
import io.cloudevents.spring.http.CloudEventsHeaders; import io.cloudevents.spring.http.CloudEventsHeaders;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageWriter; import org.springframework.http.codec.HttpMessageWriter;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/** /**
* A reactive {@link HttpMessageWriter} for {@link CloudEvent CloudEvents}, converting * A reactive {@link HttpMessageWriter} for {@link CloudEvent CloudEvents}, converting
* from a cloud event to an HTTP response. Supports the use of {@link CloudEvent} as an * from a cloud event to an HTTP response. Supports the use of {@link CloudEvent} as an
* output from a reactive endpoint. * output from a reactive endpoint.
* *
* @author Dave Syer * @author Dave Syer
* *
*/ */
@ -74,37 +74,35 @@ public class CloudEventHttpMessageWriter implements HttpMessageWriter<CloudEvent
this.response = response; this.response = response;
} }
// Binary visitor factory // Binary visitor factory
@Override @Override
public CloudEventWriter<Mono<Void>> create(SpecVersion version) { public CloudEventWriter<Mono<Void>> create(SpecVersion version) {
this.response.getHeaders().set(CloudEventsHeaders.SPEC_VERSION, version.toString()); this.response.getHeaders().set(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this; return this;
} }
// Binary visitor // Binary visitor
@Override @Override
public ReactiveHttpMessageWriter withAttribute(String name, String value) throws CloudEventRWException { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
this.response.getHeaders().set(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); String headerName = CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name);
return this; if (headerName == null) {
} headerName = "ce-" + name;
}
this.response.getHeaders().set(headerName, value);
return this;
}
@Override @Override
public ReactiveHttpMessageWriter withExtension(String name, String value) throws CloudEventRWException { public Mono<Void> end(CloudEventData value) throws CloudEventRWException {
this.response.getHeaders().set("ce-" + name, value); return copy(value.toBytes(), this.response);
return this; }
}
@Override @Override
public Mono<Void> end(CloudEventData value) throws CloudEventRWException { public Mono<Void> end() {
return copy(value.toBytes(), this.response); return copy(new byte[0], this.response);
} }
@Override
public Mono<Void> end() {
return copy(new byte[0], this.response);
}
// Structured visitor // Structured visitor
@ -119,7 +117,6 @@ public class CloudEventHttpMessageWriter implements HttpMessageWriter<CloudEvent
message.getHeaders().setContentLength(bytes.length); message.getHeaders().setContentLength(bytes.length);
return message.writeWith(Mono.just(data)); return message.writeWith(Mono.just(data));
} }
}
} }
}