Proposal for reworking the unstructured read/write of events (#146)

* Proposal for handling unstructured read/write of events

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* cloudevents-json-jackson fixed

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Everything builds

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Added two utility methods to Message

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Signature was wrong

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2020-05-15 11:41:41 +02:00 committed by GitHub
parent a485edffdb
commit cd5777791c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
65 changed files with 670 additions and 594 deletions

View File

@ -69,8 +69,10 @@ public interface Attributes {
@Nullable
ZonedDateTime getTime();
//TODO to be moved
Attributes toV03();
//TODO to be moved
Attributes toV1();
}

View File

@ -16,10 +16,7 @@
*/
package io.cloudevents;
import io.cloudevents.format.EventFormat;
import io.cloudevents.lang.Nullable;
import io.cloudevents.message.BinaryMessage;
import io.cloudevents.message.StructuredMessage;
import javax.annotation.ParametersAreNonnullByDefault;
import java.util.Map;
@ -31,7 +28,7 @@ import java.util.Map;
* @author slinkydeveloper
*/
@ParametersAreNonnullByDefault
public interface CloudEvent {
public interface CloudEvent extends CloudEventVisitable {
/**
* The event context attributes
@ -51,27 +48,62 @@ public interface CloudEvent {
*/
Map<String, Object> getExtensions();
//TODO to be moved
CloudEvent toV03();
//TODO to be moved
CloudEvent toV1();
BinaryMessage asBinaryMessage();
// --- Default implementations for CloudEventVisitable ---
// Be aware that this implementation assumes the event is SpecVersion v1.
// If you need to handle other versions, please implement this method by yourself
StructuredMessage asStructuredMessage(EventFormat format);
@Override
default <V extends CloudEventVisitor<R>, R> R visit(CloudEventVisitorFactory<V, R> visitorFactory) throws RuntimeException {
CloudEventVisitor<R> visitor = visitorFactory.create(this.getAttributes().getSpecVersion());
this.visitAttributes(visitor);
this.visitExtensions(visitor);
static io.cloudevents.v1.CloudEventBuilder buildV1() {
return new io.cloudevents.v1.CloudEventBuilder();
if (this.getData() != null) {
visitor.setBody(this.getData());
}
return visitor.end();
}
static io.cloudevents.v1.CloudEventBuilder buildV1(CloudEvent event) {
return new io.cloudevents.v1.CloudEventBuilder(event);
@Override
default void visitAttributes(CloudEventAttributesVisitor visitor) throws RuntimeException {
visitor.setAttribute("id", this.getAttributes().getId());
visitor.setAttribute("source", this.getAttributes().getSource());
visitor.setAttribute("type", this.getAttributes().getType());
if (this.getAttributes().getDataContentType() != null) {
visitor.setAttribute("datacontenttype", this.getAttributes().getDataContentType());
}
if (this.getAttributes().getDataSchema() != null) {
visitor.setAttribute("dataschema", this.getAttributes().getDataSchema());
}
if (this.getAttributes().getSubject() != null) {
visitor.setAttribute("subject", this.getAttributes().getSubject());
}
if (this.getAttributes().getTime() != null) {
visitor.setAttribute("time", this.getAttributes().getTime());
}
}
static io.cloudevents.v03.CloudEventBuilder buildV03() {
return new io.cloudevents.v03.CloudEventBuilder();
}
static io.cloudevents.v03.CloudEventBuilder buildV03(CloudEvent event) {
return new io.cloudevents.v03.CloudEventBuilder(event);
@Override
default void visitExtensions(CloudEventExtensionsVisitor visitor) throws RuntimeException {
for (Map.Entry<String, Object> entry : this.getExtensions().entrySet()) {
if (entry.getValue() instanceof String) {
visitor.setExtension(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue() instanceof Number) {
visitor.setExtension(entry.getKey(), (Number) entry.getValue());
} else if (entry.getValue() instanceof Boolean) {
visitor.setExtension(entry.getKey(), (Boolean) entry.getValue());
} else {
// This should never happen because we build that map only through our builders
throw new IllegalStateException("Illegal value inside extensions map: " + entry);
}
}
;
}
}

View File

@ -15,7 +15,7 @@
*
*/
package io.cloudevents.message;
package io.cloudevents;
import io.cloudevents.types.Time;
@ -23,26 +23,26 @@ import java.net.URI;
import java.time.ZonedDateTime;
@FunctionalInterface
public interface BinaryMessageAttributesVisitor {
public interface CloudEventAttributesVisitor {
/**
* Set attribute with type {@link String}. This setter should not be invoked for specversion, because the built Visitor already
* has the information through the {@link BinaryMessageVisitorFactory}
* has the information through the {@link CloudEventVisitorFactory}.
*
* @param name
* @param value
* @throws MessageVisitException
* @throws CloudEventVisitException
*/
void setAttribute(String name, String value) throws MessageVisitException;
void setAttribute(String name, String value) throws CloudEventVisitException;
/**
* Set attribute with type {@link URI}.
*
* @param name
* @param value
* @throws MessageVisitException
* @throws CloudEventVisitException
*/
default void setAttribute(String name, URI value) throws MessageVisitException {
default void setAttribute(String name, URI value) throws CloudEventVisitException {
setAttribute(name, value.toString());
}
@ -51,9 +51,9 @@ public interface BinaryMessageAttributesVisitor {
*
* @param name
* @param value
* @throws MessageVisitException
* @throws CloudEventVisitException
*/
default void setAttribute(String name, ZonedDateTime value) throws MessageVisitException {
default void setAttribute(String name, ZonedDateTime value) throws CloudEventVisitException {
setAttribute(name, value.format(Time.RFC3339_DATE_FORMAT));
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents;
public interface CloudEventBuilder extends CloudEventVisitor<CloudEvent> {
CloudEvent build();
static io.cloudevents.v1.CloudEventBuilder v1() {
return new io.cloudevents.v1.CloudEventBuilder();
}
static io.cloudevents.v1.CloudEventBuilder v1(CloudEvent event) {
return new io.cloudevents.v1.CloudEventBuilder(event);
}
static io.cloudevents.v03.CloudEventBuilder v03() {
return new io.cloudevents.v03.CloudEventBuilder();
}
static io.cloudevents.v03.CloudEventBuilder v03(CloudEvent event) {
return new io.cloudevents.v03.CloudEventBuilder(event);
}
static CloudEventBuilder fromSpecVersion(SpecVersion version) {
switch (version) {
case V1:
return CloudEventBuilder.v1();
case V03:
return CloudEventBuilder.v03();
}
return null;
}
}

View File

@ -15,18 +15,18 @@
*
*/
package io.cloudevents.message;
package io.cloudevents;
@FunctionalInterface
public interface BinaryMessageExtensionsVisitor {
public interface CloudEventExtensionsVisitor {
void setExtension(String name, String value) throws MessageVisitException;
void setExtension(String name, String value) throws CloudEventVisitException;
default void setExtension(String name, Number value) throws MessageVisitException {
default void setExtension(String name, Number value) throws CloudEventVisitException {
setExtension(name, value.toString());
}
default void setExtension(String name, Boolean value) throws MessageVisitException {
default void setExtension(String name, Boolean value) throws CloudEventVisitException {
setExtension(name, value.toString());
}

View File

@ -15,9 +15,9 @@
*
*/
package io.cloudevents.message;
package io.cloudevents;
public class MessageVisitException extends RuntimeException {
public class CloudEventVisitException extends RuntimeException {
public enum MessageVisitExceptionKind {
INVALID_SPEC_VERSION,
@ -30,17 +30,17 @@ public class MessageVisitException extends RuntimeException {
private MessageVisitExceptionKind kind;
public MessageVisitException(MessageVisitExceptionKind kind, Throwable cause) {
public CloudEventVisitException(MessageVisitExceptionKind kind, Throwable cause) {
super(cause);
this.kind = kind;
}
public MessageVisitException(MessageVisitExceptionKind kind, String message) {
public CloudEventVisitException(MessageVisitExceptionKind kind, String message) {
super(message);
this.kind = kind;
}
public MessageVisitException(MessageVisitExceptionKind kind, String message, Throwable cause) {
public CloudEventVisitException(MessageVisitExceptionKind kind, String message, Throwable cause) {
super(message, cause);
this.kind = kind;
}
@ -49,44 +49,44 @@ public class MessageVisitException extends RuntimeException {
return kind;
}
public static MessageVisitException newInvalidSpecVersion(String specVersion) {
return new MessageVisitException(
public static CloudEventVisitException newInvalidSpecVersion(String specVersion) {
return new CloudEventVisitException(
MessageVisitExceptionKind.INVALID_ATTRIBUTE_TYPE,
"Invalid specversion: " + specVersion
);
}
public static MessageVisitException newInvalidAttributeName(String attributeName) {
return new MessageVisitException(
public static CloudEventVisitException newInvalidAttributeName(String attributeName) {
return new CloudEventVisitException(
MessageVisitExceptionKind.INVALID_ATTRIBUTE_NAME,
"Invalid attribute: " + attributeName
);
}
public static MessageVisitException newInvalidAttributeType(String attributeName, Class<?> clazz) {
return new MessageVisitException(
public static CloudEventVisitException newInvalidAttributeType(String attributeName, Class<?> clazz) {
return new CloudEventVisitException(
MessageVisitExceptionKind.INVALID_ATTRIBUTE_TYPE,
"Invalid attribute type for \"" + attributeName + "\": " + clazz.getCanonicalName()
);
}
public static MessageVisitException newInvalidAttributeValue(String attributeName, Object value, Throwable cause) {
return new MessageVisitException(
public static CloudEventVisitException newInvalidAttributeValue(String attributeName, Object value, Throwable cause) {
return new CloudEventVisitException(
MessageVisitExceptionKind.INVALID_ATTRIBUTE_VALUE,
"Invalid attribute value for \"" + attributeName + "\": " + value,
cause
);
}
public static MessageVisitException newInvalidExtensionType(String extensionName, Class<?> clazz) {
return new MessageVisitException(
public static CloudEventVisitException newInvalidExtensionType(String extensionName, Class<?> clazz) {
return new CloudEventVisitException(
MessageVisitExceptionKind.INVALID_EXTENSION_TYPE,
"Invalid extension type for \"" + extensionName + "\": " + clazz.getCanonicalName()
);
}
public static MessageVisitException newOther(Throwable cause) {
return new MessageVisitException(
public static CloudEventVisitException newOther(Throwable cause) {
return new CloudEventVisitException(
MessageVisitExceptionKind.OTHER,
cause
);

View File

@ -0,0 +1,49 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents;
/**
* Represents an object that can be visited as CloudEvent
*/
public interface CloudEventVisitable {
/**
* Visit self using the provided visitor factory
*
* @param visitorFactory a factory that generates a visitor starting from the SpecVersion of the event
* @throws CloudEventVisitException if something went wrong during the visit.
*/
<V extends CloudEventVisitor<R>, R> R visit(CloudEventVisitorFactory<V, R> visitorFactory) throws CloudEventVisitException;
/**
* Visit self attributes using the provided visitor
*
* @param visitor Attributes visitor
* @throws CloudEventVisitException if something went wrong during the visit.
*/
void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException;
/**
* Visit self extensions using the provided visitor
*
* @param visitor Extensions visitor
* @throws CloudEventVisitException if something went wrong during the visit.
*/
void visitExtensions(CloudEventExtensionsVisitor visitor) throws CloudEventVisitException;
}

View File

@ -15,14 +15,18 @@
*
*/
package io.cloudevents.message;
package io.cloudevents;
public interface BinaryMessageAttributes {
public interface CloudEventVisitor<V> extends CloudEventAttributesVisitor, CloudEventExtensionsVisitor {
// TODO maybe, one day, we'll convert this to some byte stream
void setBody(byte[] value) throws CloudEventVisitException;
/**
* @param visitor
* @throws MessageVisitException
* End the visit
*
* @return an eventual return value
*/
void visitAttributes(BinaryMessageAttributesVisitor visitor) throws MessageVisitException;
V end();
}

View File

@ -15,11 +15,16 @@
*
*/
package io.cloudevents.message;
import io.cloudevents.SpecVersion;
package io.cloudevents;
@FunctionalInterface
public interface BinaryMessageVisitorFactory<V extends BinaryMessageVisitor<R>, R> {
V createBinaryMessageVisitor(SpecVersion version);
public interface CloudEventVisitorFactory<V extends CloudEventVisitor<R>, R> {
/**
* Create a {@link CloudEventVisitor} starting from the provided {@link SpecVersion}
*
* @param version
* @return
*/
V create(SpecVersion version);
}

View File

@ -18,7 +18,10 @@
package io.cloudevents.impl;
import io.cloudevents.Attributes;
import io.cloudevents.message.BinaryMessageAttributes;
import io.cloudevents.CloudEventAttributesVisitor;
import io.cloudevents.CloudEventVisitException;
public interface AttributesInternal extends Attributes, BinaryMessageAttributes {
public interface AttributesInternal extends Attributes {
void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException;
}

View File

@ -17,33 +17,29 @@
package io.cloudevents.impl;
import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent;
import io.cloudevents.Extension;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public abstract class BaseCloudEventBuilder<B extends BaseCloudEventBuilder<B, T>, T extends Attributes> implements BinaryMessageVisitor<CloudEvent> {
public abstract class BaseCloudEventBuilder<SELF extends BaseCloudEventBuilder<SELF, T>, T extends Attributes> implements CloudEventBuilder {
// This is a little trick for enabling fluency
private B self;
private SELF self;
private byte[] data;
private Map<String, Object> extensions;
@SuppressWarnings("unchecked")
public BaseCloudEventBuilder() {
this.self = (B) this;
this.self = (SELF) this;
this.extensions = new HashMap<>();
}
@SuppressWarnings("unchecked")
public BaseCloudEventBuilder(CloudEvent event) {
this.self = (B) this;
this.self = (SELF) this;
CloudEventImpl ev = (CloudEventImpl) event;
this.setAttributes(ev.getAttributes());
@ -53,74 +49,75 @@ public abstract class BaseCloudEventBuilder<B extends BaseCloudEventBuilder<B, T
protected abstract void setAttributes(Attributes attributes);
protected abstract B withDataContentType(String contentType);
protected abstract SELF withDataContentType(String contentType);
protected abstract B withDataSchema(URI dataSchema);
protected abstract SELF withDataSchema(URI dataSchema);
protected abstract T buildAttributes();
//TODO builder should accept data as Object and use data codecs (that we need to implement)
// to encode data
public B withData(byte[] data) {
public SELF withData(byte[] data) {
this.data = data;
return this.self;
}
public B withData(String contentType, byte[] data) {
public SELF withData(String contentType, byte[] data) {
withDataContentType(contentType);
withData(data);
return this.self;
}
public B withData(String contentType, URI dataSchema, byte[] data) {
public SELF withData(String contentType, URI dataSchema, byte[] data) {
withDataContentType(contentType);
withDataSchema(dataSchema);
withData(data);
return this.self;
}
public B withExtension(String key, String value) {
public SELF withExtension(String key, String value) {
this.extensions.put(key, value);
return self;
}
public B withExtension(String key, Number value) {
public SELF withExtension(String key, Number value) {
this.extensions.put(key, value);
return self;
}
public B withExtension(String key, boolean value) {
public SELF withExtension(String key, boolean value) {
this.extensions.put(key, value);
return self;
}
public B withExtension(Extension extension) {
public SELF withExtension(Extension extension) {
this.extensions.putAll(extension.asMap());
return self;
}
@Override
public CloudEvent build() {
return new CloudEventImpl(this.buildAttributes(), data, extensions);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
this.withExtension(name, value);
}
@Override
public void setExtension(String name, Number value) throws MessageVisitException {
public void setExtension(String name, Number value) throws CloudEventVisitException {
this.withExtension(name, value);
}
@Override
public void setExtension(String name, Boolean value) throws MessageVisitException {
public void setExtension(String name, Boolean value) throws CloudEventVisitException {
this.withExtension(name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
this.data = value;
}

View File

@ -17,15 +17,12 @@
package io.cloudevents.impl;
import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.*;
import io.cloudevents.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
public final class CloudEventImpl implements CloudEvent, BinaryMessage, BinaryMessageExtensions {
public final class CloudEventImpl implements CloudEvent {
private final AttributesInternal attributes;
private final byte[] data;
@ -71,25 +68,26 @@ public final class CloudEventImpl implements CloudEvent, BinaryMessage, BinaryMe
);
}
// Message impl
@Override
public <T extends CloudEventVisitor<V>, V> V visit(CloudEventVisitorFactory<T, V> visitorFactory) throws CloudEventVisitException, IllegalStateException {
CloudEventVisitor<V> visitor = visitorFactory.create(this.attributes.getSpecVersion());
this.attributes.visitAttributes(visitor);
this.visitExtensions(visitor);
public BinaryMessage asBinaryMessage() {
return this;
}
if (this.data != null) {
visitor.setBody(this.data);
}
public StructuredMessage asStructuredMessage(EventFormat format) {
CloudEvent ev = this;
// TODO This sucks, will improve later
return new StructuredMessage() {
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
return visitor.setEvent(format, format.serialize(ev));
}
};
return visitor.end();
}
@Override
public void visitExtensions(BinaryMessageExtensionsVisitor visitor) throws MessageVisitException {
public void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException {
this.attributes.visitAttributes(visitor);
}
@Override
public void visitExtensions(CloudEventExtensionsVisitor visitor) throws CloudEventVisitException {
// TODO to be improved
for (Map.Entry<String, Object> entry : this.extensions.entrySet()) {
if (entry.getValue() instanceof String) {
@ -105,19 +103,6 @@ public final class CloudEventImpl implements CloudEvent, BinaryMessage, BinaryMe
}
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
BinaryMessageVisitor<V> visitor = visitorFactory.createBinaryMessageVisitor(this.attributes.getSpecVersion());
this.attributes.visitAttributes(visitor);
this.visitExtensions(visitor);
if (this.data != null) {
visitor.setBody(this.data);
}
return visitor.end();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -1,46 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.message;
import io.cloudevents.CloudEvent;
@FunctionalInterface
public interface BinaryMessage {
/**
* @param visitorFactory
* @throws MessageVisitException
* @throws IllegalStateException If the message is not a valid binary message
*/
<V extends BinaryMessageVisitor<R>, R> R visit(BinaryMessageVisitorFactory<V, R> visitorFactory) throws MessageVisitException, IllegalStateException;
default CloudEvent toEvent() throws MessageVisitException, IllegalStateException {
return this.visit(specVersion -> {
switch (specVersion) {
case V1:
return CloudEvent.buildV1();
case V03:
return CloudEvent.buildV03();
}
return null; // This can never happen
});
}
;
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.message;
public interface BinaryMessageExtensions {
/**
* @param visitor
* @throws MessageVisitException
*/
void visitExtensions(BinaryMessageExtensionsVisitor visitor) throws MessageVisitException;
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.message;
public interface BinaryMessageVisitor<V> extends BinaryMessageAttributesVisitor, BinaryMessageExtensionsVisitor {
// TODO one day we'll convert this to some byte stream
void setBody(byte[] value) throws MessageVisitException;
// Returns an eventual output value
V end();
}

View File

@ -20,9 +20,5 @@ package io.cloudevents.message;
public enum Encoding {
STRUCTURED,
BINARY,
UNKNOWN;
public static IllegalStateException UNKNOWN_ENCODING_EXCEPTION = new IllegalStateException("Unknown encoding");
public static IllegalStateException WRONG_ENCODING_EXCEPTION = new IllegalStateException("Wrong encoding");
UNKNOWN
}

View File

@ -17,43 +17,88 @@
package io.cloudevents.message;
import io.cloudevents.CloudEvent;
import io.cloudevents.*;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.impl.GenericStructuredMessage;
public interface Message extends StructuredMessage, BinaryMessage {
public interface Message extends StructuredMessage, CloudEventVisitable {
/**
* Visit the message as binary encoded event using the provided visitor factory
*
* @param visitorFactory a factory that generates a visitor starting from the SpecVersion of the event
* @throws CloudEventVisitException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in binary encoding
*/
<V extends CloudEventVisitor<R>, R> R visit(CloudEventVisitorFactory<V, R> visitorFactory) throws CloudEventVisitException, IllegalStateException;
/**
* Visit the message attributes as binary encoded event using the provided visitor
*
* @param visitor Attributes visitor
* @throws CloudEventVisitException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in binary encoding
*/
void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException, IllegalStateException;
/**
* Visit the message extensions as binary encoded event using the provided visitor
*
* @param visitor Extensions visitor
* @throws CloudEventVisitException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in binary encoding
*/
void visitExtensions(CloudEventExtensionsVisitor visitor) throws CloudEventVisitException, IllegalStateException;
/**
* Visit the message as structured encoded event using the provided visitor
*
* @param visitor Structured Message visitor
* @throws CloudEventVisitException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in structured encoding
*/
<T> T visit(StructuredMessageVisitor<T> visitor) throws CloudEventVisitException, IllegalStateException;
Encoding getEncoding();
default <BV extends BinaryMessageVisitor<R>, R> R visit(MessageVisitor<BV, R> visitor) throws MessageVisitException, IllegalStateException {
default <BV extends CloudEventVisitor<R>, R> R visit(MessageVisitor<BV, R> visitor) throws CloudEventVisitException, IllegalStateException {
switch (getEncoding()) {
case BINARY:
return this.visit((BinaryMessageVisitorFactory<BV, R>) visitor);
return this.visit((CloudEventVisitorFactory<BV, R>) visitor);
case STRUCTURED:
return this.visit((StructuredMessageVisitor<R>) visitor);
default:
throw Encoding.UNKNOWN_ENCODING_EXCEPTION;
throw new IllegalStateException("Unknown encoding");
}
}
default CloudEvent toEvent() throws MessageVisitException, IllegalStateException {
default CloudEvent toEvent() throws CloudEventVisitException, IllegalStateException {
switch (getEncoding()) {
case BINARY:
return this.visit(specVersion -> {
switch (specVersion) {
case V1:
return CloudEvent.buildV1();
case V03:
return CloudEvent.buildV03();
}
return null; // This can never happen
});
return this.visit(CloudEventBuilder::fromSpecVersion);
case STRUCTURED:
return this.visit(EventFormat::deserialize);
default:
throw Encoding.UNKNOWN_ENCODING_EXCEPTION;
throw new IllegalStateException("Unknown encoding");
}
}
;
static <R> R writeStructuredEvent(CloudEvent event, String format, StructuredMessageVisitor<R> visitor) {
GenericStructuredMessage message = GenericStructuredMessage.fromEvent(format, event);
if (message == null) {
throw new IllegalArgumentException("Format " + format + " not found");
}
return message.visit(visitor);
}
static <R> R writeStructuredEvent(CloudEvent event, EventFormat format, StructuredMessageVisitor<R> visitor) {
return GenericStructuredMessage.fromEvent(format, event).visit(visitor);
}
static <V extends CloudEventVisitor<R>, R> R writeBinaryEvent(CloudEvent event, CloudEventVisitorFactory<V, R> visitor) {
return event.visit(visitor);
}
}

View File

@ -17,5 +17,9 @@
package io.cloudevents.message;
public interface MessageVisitor<BV extends BinaryMessageVisitor<R>, R> extends BinaryMessageVisitorFactory<BV, R>, StructuredMessageVisitor<R> {
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.CloudEventVisitorFactory;
//TODO javadoc
public interface MessageVisitor<CEV extends CloudEventVisitor<R>, R> extends CloudEventVisitorFactory<CEV, R>, StructuredMessageVisitor<R> {
}

View File

@ -18,22 +18,44 @@
package io.cloudevents.message;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.impl.GenericStructuredMessage;
@FunctionalInterface
public interface StructuredMessage {
/**
* @param visitor
* @throws MessageVisitException
* @throws IllegalStateException If the message is not a valid structured message
* @throws CloudEventVisitException
* @throws IllegalStateException If the message is not a valid structured message
*/
<T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException;
<T> T visit(StructuredMessageVisitor<T> visitor) throws CloudEventVisitException, IllegalStateException;
default CloudEvent toEvent() throws MessageVisitException, IllegalStateException {
default CloudEvent toEvent() throws CloudEventVisitException, IllegalStateException {
return this.visit(EventFormat::deserialize);
}
;
/**
* Create a generic structured message from a {@link CloudEvent}
*
* @param contentType content type to use to resolve the {@link EventFormat}
* @param event
* @return null if format was not found, otherwise returns the built message
*/
static StructuredMessage fromEvent(String contentType, CloudEvent event) {
return GenericStructuredMessage.fromEvent(contentType, event);
}
/**
* Create a generic structured message from a {@link CloudEvent}
*
* @param format
* @param event
* @return null if format was not found, otherwise returns the built message
*/
static StructuredMessage fromEvent(EventFormat format, CloudEvent event) {
return GenericStructuredMessage.fromEvent(format, event);
}
}

View File

@ -17,12 +17,14 @@
package io.cloudevents.message;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.format.EventFormat;
//TODO javadoc
@FunctionalInterface
public interface StructuredMessageVisitor<T> {
// TODO one day we'll convert this to some byte stream
T setEvent(EventFormat format, byte[] value) throws MessageVisitException;
T setEvent(EventFormat format, byte[] value) throws CloudEventVisitException;
}

View File

@ -17,9 +17,9 @@
package io.cloudevents.message.impl;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.message.Encoding;
import io.cloudevents.message.Message;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.message.StructuredMessageVisitor;
public abstract class BaseBinaryMessage implements Message {
@ -30,7 +30,7 @@ public abstract class BaseBinaryMessage implements Message {
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
throw Encoding.UNKNOWN_ENCODING_EXCEPTION;
public <T> T visit(StructuredMessageVisitor<T> visitor) throws CloudEventVisitException, IllegalStateException {
throw MessageUtils.generateWrongEncoding(Encoding.STRUCTURED, Encoding.BINARY);
}
}

View File

@ -17,10 +17,7 @@
package io.cloudevents.message.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.BinaryMessageVisitorFactory;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.*;
import java.util.Objects;
import java.util.function.BiConsumer;
@ -45,10 +42,12 @@ public abstract class BaseGenericBinaryMessageImpl<HK, HV> extends BaseBinaryMes
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
BinaryMessageVisitor<V> visitor = visitorFactory.createBinaryMessageVisitor(this.version);
public <T extends CloudEventVisitor<V>, V> V visit(CloudEventVisitorFactory<T, V> visitorFactory) throws CloudEventVisitException, IllegalStateException {
CloudEventVisitor<V> visitor = visitorFactory.create(this.version);
// Grab from headers the attributes and extensions
// This implementation avoids to use visitAttributes and visitExtensions
// in order to complete the visit in one loop
this.forEachHeader((key, value) -> {
if (isContentTypeHeader(key)) {
visitor.setAttribute("datacontenttype", toCloudEventsValue(value));
@ -73,6 +72,36 @@ public abstract class BaseGenericBinaryMessageImpl<HK, HV> extends BaseBinaryMes
return visitor.end();
}
@Override
public void visitAttributes(CloudEventAttributesVisitor visitor) throws RuntimeException {
this.forEachHeader((key, value) -> {
if (isContentTypeHeader(key)) {
visitor.setAttribute("datacontenttype", toCloudEventsValue(value));
} else if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (name.equals("specversion")) {
return;
}
if (this.version.getAllAttributes().contains(name)) {
visitor.setAttribute(name, toCloudEventsValue(value));
}
}
});
}
@Override
public void visitExtensions(CloudEventExtensionsVisitor visitor) throws RuntimeException {
// Grab from headers the attributes and extensions
this.forEachHeader((key, value) -> {
if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (!this.version.getAllAttributes().contains(name)) {
visitor.setExtension(name, toCloudEventsValue(value));
}
}
});
}
protected abstract boolean isContentTypeHeader(HK key);
protected abstract boolean isCloudEventsHeader(HK key);

View File

@ -17,7 +17,12 @@
package io.cloudevents.message.impl;
import io.cloudevents.message.*;
import io.cloudevents.CloudEventAttributesVisitor;
import io.cloudevents.CloudEventExtensionsVisitor;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.CloudEventVisitorFactory;
import io.cloudevents.message.Encoding;
import io.cloudevents.message.Message;
public abstract class BaseStructuredMessage implements Message {
@ -27,7 +32,17 @@ public abstract class BaseStructuredMessage implements Message {
}
@Override
public <V extends BinaryMessageVisitor<R>, R> R visit(BinaryMessageVisitorFactory<V, R> visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.UNKNOWN_ENCODING_EXCEPTION;
public <V extends CloudEventVisitor<R>, R> R visit(CloudEventVisitorFactory<V, R> visitorFactory) {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}
@Override
public void visitAttributes(CloudEventAttributesVisitor visitor) throws RuntimeException {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}
@Override
public void visitExtensions(CloudEventExtensionsVisitor visitor) throws RuntimeException {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}
}

View File

@ -17,9 +17,11 @@
package io.cloudevents.message.impl;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventFormatProvider;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.lang.Nullable;
import io.cloudevents.message.StructuredMessageVisitor;
public class GenericStructuredMessage extends BaseStructuredMessage {
@ -33,15 +35,15 @@ public class GenericStructuredMessage extends BaseStructuredMessage {
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
public <T> T visit(StructuredMessageVisitor<T> visitor) throws CloudEventVisitException, IllegalStateException {
return visitor.setEvent(format, payload);
}
/**
* TODO
* Create a generic structured message from a payload
*
* @param contentType
* @param payload
* @param contentType content type to use to resolve the {@link EventFormat}
* @param payload serialized event
* @return null if format was not found, otherwise returns the built message
*/
public static GenericStructuredMessage fromContentType(String contentType, byte[] payload) {
@ -53,4 +55,31 @@ public class GenericStructuredMessage extends BaseStructuredMessage {
return new GenericStructuredMessage(format, payload);
}
/**
* Create a generic structured message from a {@link CloudEvent}
*
* @param contentType content type to use to resolve the {@link EventFormat}
* @param event
* @return null if format was not found, otherwise returns the built message
*/
@Nullable
public static GenericStructuredMessage fromEvent(String contentType, CloudEvent event) {
EventFormat format = EventFormatProvider.getInstance().resolveFormat(contentType);
if (format == null) {
return null;
}
return fromEvent(format, event);
}
/**
* Create a generic structured message from a {@link CloudEvent}
*
* @param format
* @param event
* @return returns the built message
*/
public static GenericStructuredMessage fromEvent(EventFormat format, CloudEvent event) {
return new GenericStructuredMessage(format, format.serialize(event));
}
}

View File

@ -20,6 +20,7 @@ package io.cloudevents.message.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventFormatProvider;
import io.cloudevents.message.Encoding;
import io.cloudevents.message.Message;
import java.util.Map;
@ -82,4 +83,8 @@ public class MessageUtils {
.collect(Collectors.toMap(Function.identity(), headerNameMapping));
}
public static IllegalStateException generateWrongEncoding(Encoding expected, Encoding actual) {
return new IllegalStateException("Cannot visit message as " + expected + " because the actual encoding is " + actual);
}
}

View File

@ -17,7 +17,10 @@
package io.cloudevents.message.impl;
import io.cloudevents.message.*;
import io.cloudevents.*;
import io.cloudevents.message.Encoding;
import io.cloudevents.message.Message;
import io.cloudevents.message.StructuredMessageVisitor;
public class UnknownEncodingMessage implements Message {
@Override
@ -26,12 +29,22 @@ public class UnknownEncodingMessage implements Message {
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
public <T extends CloudEventVisitor<V>, V> V visit(CloudEventVisitorFactory<T, V> visitorFactory) throws CloudEventVisitException, IllegalStateException {
throw new IllegalStateException("Unknown encoding");
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
public void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException {
throw new IllegalStateException("Unknown encoding");
}
@Override
public void visitExtensions(CloudEventExtensionsVisitor visitor) throws CloudEventVisitException {
throw new IllegalStateException("Unknown encoding");
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws CloudEventVisitException, IllegalStateException {
throw new IllegalStateException("Unknown encoding");
}
}

View File

@ -17,11 +17,11 @@
package io.cloudevents.v03;
import io.cloudevents.Attributes;
import io.cloudevents.CloudEventAttributesVisitor;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.SpecVersion;
import io.cloudevents.impl.AttributesInternal;
import io.cloudevents.lang.Nullable;
import io.cloudevents.message.BinaryMessageAttributesVisitor;
import io.cloudevents.message.MessageVisitException;
import java.net.URI;
import java.time.ZonedDateTime;
@ -94,7 +94,7 @@ public final class AttributesImpl implements AttributesInternal {
}
@Override
public void visitAttributes(BinaryMessageAttributesVisitor visitor) throws MessageVisitException {
public void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException {
visitor.setAttribute(
ContextAttributes.ID.name().toLowerCase(),
this.id

View File

@ -18,8 +18,8 @@ package io.cloudevents.v03;
import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.impl.BaseCloudEventBuilder;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.types.Time;
import java.net.URI;
@ -114,7 +114,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
// Message impl
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
switch (name) {
case "id":
withId(value);
@ -123,7 +123,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
try {
withSource(new URI(value));
} catch (URISyntaxException e) {
throw MessageVisitException.newInvalidAttributeValue("source", value, e);
throw CloudEventVisitException.newInvalidAttributeValue("source", value, e);
}
return;
case "type":
@ -139,7 +139,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
try {
withSchemaUrl(new URI(value));
} catch (URISyntaxException e) {
throw MessageVisitException.newInvalidAttributeValue("schemaurl", value, e);
throw CloudEventVisitException.newInvalidAttributeValue("schemaurl", value, e);
}
return;
case "subject":
@ -149,15 +149,15 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
try {
withTime(Time.parseTime(value));
} catch (DateTimeParseException e) {
throw MessageVisitException.newInvalidAttributeValue("time", value, e);
throw CloudEventVisitException.newInvalidAttributeValue("time", value, e);
}
return;
}
throw MessageVisitException.newInvalidAttributeName(name);
throw CloudEventVisitException.newInvalidAttributeName(name);
}
@Override
public void setAttribute(String name, URI value) throws MessageVisitException {
public void setAttribute(String name, URI value) throws CloudEventVisitException {
switch (name) {
case "source":
withSource(value);
@ -166,15 +166,15 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
withDataSchema(value);
return;
}
throw MessageVisitException.newInvalidAttributeType(name, URI.class);
throw CloudEventVisitException.newInvalidAttributeType(name, URI.class);
}
@Override
public void setAttribute(String name, ZonedDateTime value) throws MessageVisitException {
public void setAttribute(String name, ZonedDateTime value) throws CloudEventVisitException {
if ("time".equals(name)) {
withTime(value);
return;
}
throw MessageVisitException.newInvalidAttributeType(name, ZonedDateTime.class);
throw CloudEventVisitException.newInvalidAttributeType(name, ZonedDateTime.class);
}
}

View File

@ -17,10 +17,10 @@
package io.cloudevents.v1;
import io.cloudevents.Attributes;
import io.cloudevents.CloudEventAttributesVisitor;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.SpecVersion;
import io.cloudevents.impl.AttributesInternal;
import io.cloudevents.message.BinaryMessageAttributesVisitor;
import io.cloudevents.message.MessageVisitException;
import java.net.URI;
import java.time.ZonedDateTime;
@ -107,7 +107,7 @@ public final class AttributesImpl implements AttributesInternal {
}
@Override
public void visitAttributes(BinaryMessageAttributesVisitor visitor) throws MessageVisitException {
public void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException {
visitor.setAttribute(
ContextAttributes.ID.name().toLowerCase(),
this.id

View File

@ -19,8 +19,8 @@ package io.cloudevents.v1;
import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.impl.BaseCloudEventBuilder;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.types.Time;
import java.net.URI;
@ -108,7 +108,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
// Message impl
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
switch (name) {
case "id":
withId(value);
@ -117,7 +117,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
try {
withSource(new URI(value));
} catch (URISyntaxException e) {
throw MessageVisitException.newInvalidAttributeValue("source", value, e);
throw CloudEventVisitException.newInvalidAttributeValue("source", value, e);
}
return;
case "type":
@ -130,7 +130,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
try {
withDataSchema(new URI(value));
} catch (URISyntaxException e) {
throw MessageVisitException.newInvalidAttributeValue("dataschema", value, e);
throw CloudEventVisitException.newInvalidAttributeValue("dataschema", value, e);
}
return;
case "subject":
@ -140,15 +140,15 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
try {
withTime(Time.parseTime(value));
} catch (DateTimeParseException e) {
throw MessageVisitException.newInvalidAttributeValue("time", value, e);
throw CloudEventVisitException.newInvalidAttributeValue("time", value, e);
}
return;
}
throw MessageVisitException.newInvalidAttributeName(name);
throw CloudEventVisitException.newInvalidAttributeName(name);
}
@Override
public void setAttribute(String name, URI value) throws MessageVisitException {
public void setAttribute(String name, URI value) throws CloudEventVisitException {
switch (name) {
case "source":
withSource(value);
@ -157,15 +157,15 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
withDataSchema(value);
return;
}
throw MessageVisitException.newInvalidAttributeType(name, URI.class);
throw CloudEventVisitException.newInvalidAttributeType(name, URI.class);
}
@Override
public void setAttribute(String name, ZonedDateTime value) throws MessageVisitException {
public void setAttribute(String name, ZonedDateTime value) throws CloudEventVisitException {
if ("time".equals(name)) {
withTime(value);
return;
}
throw MessageVisitException.newInvalidAttributeType(name, ZonedDateTime.class);
throw CloudEventVisitException.newInvalidAttributeType(name, ZonedDateTime.class);
}
}

View File

@ -17,6 +17,7 @@
package io.cloudevents.extensions;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@ -32,7 +33,7 @@ public class DistributedTracingExtensionTest {
tracing.setTraceparent("parent");
tracing.setTracestate("state");
CloudEvent event = CloudEvent.buildV1().withExtension(tracing).build();
CloudEvent event = CloudEventBuilder.v1().withExtension(tracing).build();
assertThat(event.getExtensions())
.containsEntry(DistributedTracingExtension.TRACEPARENT, "parent")
@ -41,7 +42,7 @@ public class DistributedTracingExtensionTest {
@Test
public void parseExtension() {
CloudEvent event = CloudEvent.buildV1()
CloudEvent event = CloudEventBuilder.v1()
.withExtension(DistributedTracingExtension.TRACEPARENT, "parent")
.withExtension(DistributedTracingExtension.TRACESTATE, "state")
.build();

View File

@ -18,6 +18,7 @@
package io.cloudevents.impl;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import org.junit.jupiter.api.Test;
import static io.cloudevents.test.Data.*;
@ -27,7 +28,7 @@ public class CloudEventImplTest {
@Test
public void testEqualityV03() {
CloudEvent event1 = CloudEvent.buildV03()
CloudEvent event1 = CloudEventBuilder.v03()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -36,7 +37,7 @@ public class CloudEventImplTest {
.withTime(TIME)
.build();
CloudEvent event2 = CloudEvent.buildV03()
CloudEvent event2 = CloudEventBuilder.v03()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -50,7 +51,7 @@ public class CloudEventImplTest {
@Test
public void testEqualityV1() {
CloudEvent event1 = CloudEvent.buildV1()
CloudEvent event1 = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -59,7 +60,7 @@ public class CloudEventImplTest {
.withTime(TIME)
.build();
CloudEvent event2 = CloudEvent.buildV1()
CloudEvent event2 = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)

View File

@ -18,6 +18,7 @@
package io.cloudevents.message;
import io.cloudevents.CloudEvent;
import io.cloudevents.message.impl.GenericStructuredMessage;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.mock.MockBinaryMessage;
import io.cloudevents.mock.MockStructuredMessage;
@ -36,7 +37,7 @@ public class EventMessageRoundtripTest {
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
void structuredToEvent(CloudEvent input) {
assertThat(input.asStructuredMessage(CSVFormat.INSTANCE).toEvent())
assertThat(GenericStructuredMessage.fromEvent(CSVFormat.INSTANCE, input).toEvent())
.isEqualTo(input);
}
@ -48,21 +49,14 @@ public class EventMessageRoundtripTest {
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
void structuredToMockStructuredMessageToEvent(CloudEvent input) {
assertThat(input.asStructuredMessage(CSVFormat.INSTANCE).visit(new MockStructuredMessage()).toEvent())
.isEqualTo(input);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#allEvents")
void binaryToEvent(CloudEvent input) {
assertThat(input.asBinaryMessage().toEvent())
assertThat(GenericStructuredMessage.fromEvent(CSVFormat.INSTANCE, input).visit(new MockStructuredMessage()).toEvent())
.isEqualTo(input);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#allEvents")
void binaryToMockBinaryMessageToEvent(CloudEvent input) {
assertThat(input.asBinaryMessage().visit(new MockBinaryMessage()).toEvent())
assertThat(input.visit(new MockBinaryMessage()).toEvent())
.isEqualTo(input);
}

View File

@ -70,7 +70,7 @@ public class CSVFormat implements EventFormat {
ZonedDateTime time = splitted[7].equals("null") ? null : Time.parseTime(splitted[7]);
byte[] data = splitted[8].equals("null") ? null : Base64.getDecoder().decode(splitted[8].getBytes());
CloudEventBuilder builder = CloudEvent.buildV1()
CloudEventBuilder builder = io.cloudevents.CloudEventBuilder.v1()
.withId(id)
.withType(type)
.withSource(source);

View File

@ -17,15 +17,16 @@
package io.cloudevents.mock;
import io.cloudevents.SpecVersion;
import io.cloudevents.message.*;
import io.cloudevents.*;
import io.cloudevents.message.Message;
import io.cloudevents.message.impl.BaseBinaryMessage;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;
public class MockBinaryMessage implements Message, BinaryMessageVisitorFactory<MockBinaryMessage, MockBinaryMessage>, BinaryMessageVisitor<MockBinaryMessage> {
public class MockBinaryMessage extends BaseBinaryMessage implements Message, CloudEventVisitorFactory<MockBinaryMessage, MockBinaryMessage>, CloudEventVisitor<MockBinaryMessage> {
private SpecVersion version;
private Map<String, Object> attributes;
@ -45,17 +46,22 @@ public class MockBinaryMessage implements Message, BinaryMessageVisitorFactory<M
}
@Override
public Encoding getEncoding() {
return Encoding.BINARY;
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
public <T extends CloudEventVisitor<V>, V> V visit(CloudEventVisitorFactory<T, V> visitorFactory) throws CloudEventVisitException, IllegalStateException {
if (version == null) {
throw new IllegalStateException("MockBinaryMessage is empty");
}
BinaryMessageVisitor<V> visitor = visitorFactory.createBinaryMessageVisitor(version);
CloudEventVisitor<V> visitor = visitorFactory.create(version);
this.visitAttributes(visitor);
this.visitExtensions(visitor);
visitor.setBody(this.data);
return visitor.end();
}
@Override
public void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException, IllegalStateException {
for (Map.Entry<String, Object> e : this.attributes.entrySet()) {
if (e.getValue() instanceof String) {
visitor.setAttribute(e.getKey(), (String) e.getValue());
@ -68,7 +74,10 @@ public class MockBinaryMessage implements Message, BinaryMessageVisitorFactory<M
throw new IllegalStateException("Illegal value inside attributes map: " + e);
}
}
}
@Override
public void visitExtensions(CloudEventExtensionsVisitor visitor) throws CloudEventVisitException, IllegalStateException {
for (Map.Entry<String, Object> entry : this.extensions.entrySet()) {
if (entry.getValue() instanceof String) {
visitor.setExtension(entry.getKey(), (String) entry.getValue());
@ -81,19 +90,10 @@ public class MockBinaryMessage implements Message, BinaryMessageVisitorFactory<M
throw new IllegalStateException("Illegal value inside extensions map: " + entry);
}
}
visitor.setBody(this.data);
return visitor.end();
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
this.data = value;
}
@ -103,37 +103,37 @@ public class MockBinaryMessage implements Message, BinaryMessageVisitorFactory<M
}
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
this.attributes.put(name, value);
}
@Override
public void setAttribute(String name, URI value) throws MessageVisitException {
public void setAttribute(String name, URI value) throws CloudEventVisitException {
this.attributes.put(name, value);
}
@Override
public void setAttribute(String name, ZonedDateTime value) throws MessageVisitException {
public void setAttribute(String name, ZonedDateTime value) throws CloudEventVisitException {
this.attributes.put(name, value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
this.extensions.put(name, value);
}
@Override
public void setExtension(String name, Number value) throws MessageVisitException {
public void setExtension(String name, Number value) throws CloudEventVisitException {
this.extensions.put(name, value);
}
@Override
public void setExtension(String name, Boolean value) throws MessageVisitException {
public void setExtension(String name, Boolean value) throws CloudEventVisitException {
this.extensions.put(name, value);
}
@Override
public MockBinaryMessage createBinaryMessageVisitor(SpecVersion version) {
public MockBinaryMessage create(SpecVersion version) {
this.version = version;
return this;

View File

@ -17,26 +17,19 @@
package io.cloudevents.mock;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.*;
import io.cloudevents.message.Message;
import io.cloudevents.message.StructuredMessageVisitor;
import io.cloudevents.message.impl.BaseStructuredMessage;
public class MockStructuredMessage implements Message, StructuredMessageVisitor<MockStructuredMessage> {
public class MockStructuredMessage extends BaseStructuredMessage implements Message, StructuredMessageVisitor<MockStructuredMessage> {
private EventFormat format;
private byte[] payload;
@Override
public Encoding getEncoding() {
return Encoding.STRUCTURED;
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
public <T> T visit(StructuredMessageVisitor<T> visitor) throws CloudEventVisitException, IllegalStateException {
if (this.format == null) {
throw new IllegalStateException("MockStructuredMessage is empty");
}
@ -45,7 +38,7 @@ public class MockStructuredMessage implements Message, StructuredMessageVisitor<
}
@Override
public MockStructuredMessage setEvent(EventFormat format, byte[] value) throws MessageVisitException {
public MockStructuredMessage setEvent(EventFormat format, byte[] value) throws CloudEventVisitException {
this.format = format;
this.payload = value;

View File

@ -18,6 +18,7 @@
package io.cloudevents.test;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import io.cloudevents.types.Time;
import java.net.URI;
@ -40,13 +41,13 @@ public class Data {
public static byte[] DATA_XML_SERIALIZED = "<stuff></stuff>".getBytes();
public static byte[] DATA_TEXT_SERIALIZED = "Hello World Lorena!".getBytes();
public static final CloudEvent V1_MIN = CloudEvent.buildV1()
public static final CloudEvent V1_MIN = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
.build();
public static final CloudEvent V1_WITH_JSON_DATA = CloudEvent.buildV1()
public static final CloudEvent V1_WITH_JSON_DATA = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -55,7 +56,7 @@ public class Data {
.withTime(TIME)
.build();
public static final CloudEvent V1_WITH_JSON_DATA_WITH_EXT = CloudEvent.buildV1()
public static final CloudEvent V1_WITH_JSON_DATA_WITH_EXT = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -67,7 +68,7 @@ public class Data {
.withExtension("anumber", 10)
.build();
public static final CloudEvent V1_WITH_JSON_DATA_WITH_EXT_STRING = CloudEvent.buildV1()
public static final CloudEvent V1_WITH_JSON_DATA_WITH_EXT_STRING = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -79,7 +80,7 @@ public class Data {
.withExtension("anumber", "10")
.build();
public static final CloudEvent V1_WITH_XML_DATA = CloudEvent.buildV1()
public static final CloudEvent V1_WITH_XML_DATA = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -88,7 +89,7 @@ public class Data {
.withTime(TIME)
.build();
public static final CloudEvent V1_WITH_TEXT_DATA = CloudEvent.buildV1()
public static final CloudEvent V1_WITH_TEXT_DATA = CloudEventBuilder.v1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
@ -138,7 +139,7 @@ public class Data {
public static Stream<CloudEvent> v1EventsWithStringExt() {
return v1Events().map(ce -> {
io.cloudevents.v1.CloudEventBuilder builder = CloudEvent.buildV1(ce);
io.cloudevents.v1.CloudEventBuilder builder = CloudEventBuilder.v1(ce);
ce.getExtensions().forEach((k, v) -> builder.withExtension(k, v.toString()));
return builder.build();
});
@ -146,7 +147,7 @@ public class Data {
public static Stream<CloudEvent> v03EventsWithStringExt() {
return v03Events().map(ce -> {
io.cloudevents.v03.CloudEventBuilder builder = CloudEvent.buildV03(ce);
io.cloudevents.v03.CloudEventBuilder builder = CloudEventBuilder.v03(ce);
ce.getExtensions().forEach((k, v) -> builder.withExtension(k, v.toString()));
return builder.build();
});

View File

@ -17,6 +17,7 @@
package io.cloudevents.v03;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import io.cloudevents.SpecVersion;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -31,13 +32,13 @@ public class CloudEventBuilderTest {
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#v03Events")
void testCopyWithBuilder(CloudEvent event) {
assertThat(CloudEvent.buildV03(event).build()).isEqualTo(event);
assertThat(CloudEventBuilder.v03(event).build()).isEqualTo(event);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#v03Events")
void testToV1(CloudEvent event) {
CloudEvent eventV1 = CloudEvent.buildV1(event).build();
CloudEvent eventV1 = CloudEventBuilder.v1(event).build();
assertThat(eventV1.getAttributes().getSpecVersion())
.isEqualTo(SpecVersion.V1);

View File

@ -17,6 +17,7 @@
package io.cloudevents.v1;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import io.cloudevents.SpecVersion;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -32,13 +33,13 @@ public class CloudEventBuilderTest {
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#v1Events")
void testCopyWithBuilder(CloudEvent event) {
assertThat(CloudEvent.buildV1(event).build()).isEqualTo(event);
assertThat(CloudEventBuilder.v1(event).build()).isEqualTo(event);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#v1Events")
void testToV03(CloudEvent event) {
CloudEvent eventV03 = CloudEvent.buildV03(event).build();
CloudEvent eventV03 = CloudEventBuilder.v03(event).build();
assertThat(eventV03.getAttributes().getSpecVersion())
.isEqualTo(SpecVersion.V03);

View File

@ -17,7 +17,7 @@ For Maven:
You don't need to perform any operation to configure the module, more than adding the dependency to your project:
```java
CloudEvent event = CloudEvent.buildV1()
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))

View File

@ -25,12 +25,7 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.message.BinaryMessage;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.BinaryMessageVisitorFactory;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.*;
import java.io.IOException;
@ -40,7 +35,7 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
super(CloudEvent.class);
}
private static class JsonMessage implements BinaryMessage {
private static class JsonMessage implements CloudEventVisitable {
private final JsonParser p;
private final ObjectNode node;
@ -51,10 +46,10 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
public <T extends CloudEventVisitor<V>, V> V visit(CloudEventVisitorFactory<T, V> visitorFactory) throws CloudEventVisitException, IllegalStateException {
try {
SpecVersion specVersion = SpecVersion.parse(getStringNode(this.node, this.p, "specversion"));
BinaryMessageVisitor<V> visitor = visitorFactory.createBinaryMessageVisitor(specVersion);
CloudEventVisitor<V> visitor = visitorFactory.create(specVersion);
// Read mandatory attributes
for (String attr : specVersion.getMandatoryAttributes()) {
@ -146,6 +141,16 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
}
}
@Override
public void visitAttributes(CloudEventAttributesVisitor visitor) throws CloudEventVisitException {
// no-op no need for that
}
@Override
public void visitExtensions(CloudEventExtensionsVisitor visitor) throws CloudEventVisitException {
// no-op no need for that
}
private String getStringNode(ObjectNode objNode, JsonParser p, String attributeName) throws JsonProcessingException {
String val = getOptionalStringNode(objNode, p, attributeName);
if (val == null) {
@ -181,7 +186,7 @@ public class CloudEventDeserializer extends StdDeserializer<CloudEvent> {
ObjectNode node = ctxt.readValue(p, ObjectNode.class);
try {
return new JsonMessage(p, node).toEvent();
return new JsonMessage(p, node).visit(CloudEventBuilder::fromSpecVersion);
} catch (RuntimeException e) {
// Yeah this is bad but it's needed to support checked exceptions...
if (e.getCause() instanceof IOException) {

View File

@ -21,11 +21,9 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.cloudevents.CloudEvent;
import io.cloudevents.impl.AttributesInternal;
import io.cloudevents.impl.CloudEventImpl;
import io.cloudevents.message.BinaryMessageAttributesVisitor;
import io.cloudevents.message.BinaryMessageExtensionsVisitor;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.CloudEventAttributesVisitor;
import io.cloudevents.CloudEventExtensionsVisitor;
import io.cloudevents.CloudEventVisitException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -41,36 +39,18 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
this.forceStringSerialization = forceStringSerialization;
}
private static class AttributesSerializer implements BinaryMessageAttributesVisitor {
private static class FieldsSerializer implements CloudEventAttributesVisitor, CloudEventExtensionsVisitor {
private JsonGenerator gen;
private final JsonGenerator gen;
private final SerializerProvider provider;
public AttributesSerializer(JsonGenerator gen) {
this.gen = gen;
}
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
try {
gen.writeStringField(name, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private static class ExtensionsSerializer implements BinaryMessageExtensionsVisitor {
private JsonGenerator gen;
private SerializerProvider provider;
public ExtensionsSerializer(JsonGenerator gen, SerializerProvider provider) {
public FieldsSerializer(JsonGenerator gen, SerializerProvider provider) {
this.gen = gen;
this.provider = provider;
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
try {
gen.writeStringField(name, value);
} catch (IOException e) {
@ -79,7 +59,16 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
}
@Override
public void setExtension(String name, Number value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
try {
gen.writeStringField(name, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void setExtension(String name, Number value) throws CloudEventVisitException {
try {
gen.writeFieldName(name);
provider.findValueSerializer(value.getClass()).serialize(value, gen, provider);
@ -89,7 +78,7 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
}
@Override
public void setExtension(String name, Boolean value) throws MessageVisitException {
public void setExtension(String name, Boolean value) throws CloudEventVisitException {
try {
gen.writeBooleanField(name, value);
} catch (IOException e) {
@ -104,26 +93,20 @@ public class CloudEventSerializer extends StdSerializer<CloudEvent> {
gen.writeStringField("specversion", value.getAttributes().getSpecVersion().toString());
// Serialize attributes
AttributesInternal attributesInternal = (AttributesInternal) value.getAttributes();
try {
attributesInternal.visitAttributes(new AttributesSerializer(gen));
} catch (RuntimeException e) {
throw (IOException) e.getCause();
}
// Serialize extensions
try {
((CloudEventImpl) value).visitExtensions(new ExtensionsSerializer(gen, provider));
FieldsSerializer serializer = new FieldsSerializer(gen, provider);
value.visitAttributes(serializer);
value.visitExtensions(serializer);
} catch (RuntimeException e) {
throw (IOException) e.getCause();
}
// Serialize data
byte[] data = value.getData();
String contentType = attributesInternal.getDataContentType();
String contentType = value.getAttributes().getDataContentType();
if (data != null) {
if (shouldSerializeBase64(contentType)) {
switch (attributesInternal.getSpecVersion()) {
switch (value.getAttributes().getSpecVersion()) {
case V03:
gen.writeStringField("datacontentencoding", "base64");
gen.writeFieldName("data");

View File

@ -1,47 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.format.json;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.cloudevents.types.Time;
import java.io.IOException;
import java.time.DateTimeException;
import java.time.ZonedDateTime;
public class ZonedDateTimeDeserializer extends StdDeserializer<ZonedDateTime> {
private static final long serialVersionUID = 1L;
protected ZonedDateTimeDeserializer() {
this(null);
}
protected ZonedDateTimeDeserializer(Class<?> vc) {
super(vc);
}
@Override
public ZonedDateTime deserialize(JsonParser jsonparser, DeserializationContext ctxt) throws IOException {
try {
return ZonedDateTime.parse(jsonparser.getText(), Time.RFC3339_DATE_FORMAT);
} catch (DateTimeException e) {
throw new IllegalArgumentException("could not parse");
}
}
}

View File

@ -1,45 +0,0 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.format.json;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.cloudevents.types.Time;
import java.io.IOException;
import java.time.ZonedDateTime;
public class ZonedDateTimeSerializer extends StdSerializer<ZonedDateTime> {
private static final long serialVersionUID = 6245182835980474796L;
protected ZonedDateTimeSerializer() {
this(null, false);
}
protected ZonedDateTimeSerializer(Class<?> t, boolean dummy) {
super(t, dummy);
}
@Override
public void serialize(ZonedDateTime time, JsonGenerator generator,
SerializerProvider provider) throws IOException {
generator.writeString(time.format(Time.RFC3339_DATE_FORMAT));
}
}

View File

@ -17,7 +17,7 @@ You need to configure the `CloudEventsProvider` to enable marshalling/unmarshall
Below is a sample on how to read and write CloudEvents:
```java
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEvent;import io.cloudevents.CloudEventBuilder;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@ -30,7 +30,7 @@ public class EventReceiverResource {
@GET
@Path("getMinEvent")
public CloudEvent getMinEvent() {
return CloudEvent.buildV1()
return CloudEventBuilder.v1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))
@ -42,7 +42,7 @@ public class EventReceiverResource {
@Path("getStructuredEvent")
@StructuredEncoding("application/cloudevents+csv")
public CloudEvent getStructuredEvent() {
return CloudEvent.buildV1()
return CloudEventBuilder.v1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))

View File

@ -18,13 +18,14 @@
package io.cloudevents.http.restful.ws;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventFormatProvider;
import io.cloudevents.http.restful.ws.impl.RestfulWSClientMessageVisitor;
import io.cloudevents.http.restful.ws.impl.RestfulWSMessageFactory;
import io.cloudevents.http.restful.ws.impl.RestfulWSMessageVisitor;
import io.cloudevents.http.restful.ws.impl.Utils;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.Message;
import io.cloudevents.message.MessageVisitor;
import javax.ws.rs.Consumes;
@ -107,26 +108,22 @@ public class CloudEventsProvider implements MessageBodyReader<CloudEvent>, Messa
}
}
private <V extends MessageVisitor<V, Void> & BinaryMessageVisitor<Void>> void writeBinary(CloudEvent input, V visitor) {
input.asBinaryMessage().visit(visitor);
private <V extends MessageVisitor<V, Void> & CloudEventVisitor<Void>> void writeBinary(CloudEvent input, V visitor) {
Message.writeBinaryEvent(input, visitor);
}
private <V extends MessageVisitor<V, Void> & BinaryMessageVisitor<Void>> void writeStructured(CloudEvent input, EventFormat format, V visitor) {
input
.asStructuredMessage(format)
.visit(visitor);
private <V extends MessageVisitor<V, Void> & CloudEventVisitor<Void>> void writeStructured(CloudEvent input, EventFormat format, V visitor) {
Message.writeStructuredEvent(input, format, visitor);
}
private <V extends MessageVisitor<V, Void> & BinaryMessageVisitor<Void>> void writeStructured(CloudEvent input, String formatString, V visitor) {
private <V extends MessageVisitor<V, Void> & CloudEventVisitor<Void>> void writeStructured(CloudEvent input, String formatString, V visitor) {
EventFormat format = EventFormatProvider.getInstance().resolveFormat(formatString);
if (format == null) {
throw new IllegalArgumentException("Cannot resolve format " + formatString);
}
input
.asStructuredMessage(format)
.visit(visitor);
writeStructured(input, format, visitor);
}
@Override

View File

@ -17,17 +17,17 @@
package io.cloudevents.http.restful.ws.impl;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.message.MessageVisitor;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
public final class RestfulWSClientMessageVisitor implements BinaryMessageVisitor<Void>, MessageVisitor<RestfulWSClientMessageVisitor, Void> {
public final class RestfulWSClientMessageVisitor implements CloudEventVisitor<Void>, MessageVisitor<RestfulWSClientMessageVisitor, Void> {
private final ClientRequestContext context;
@ -40,23 +40,23 @@ public final class RestfulWSClientMessageVisitor implements BinaryMessageVisitor
}
@Override
public RestfulWSClientMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
public RestfulWSClientMessageVisitor create(SpecVersion version) {
this.context.getHeaders().add(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
this.context.getHeaders().add(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
this.context.getHeaders().add(CloudEventsHeaders.CE_PREFIX + name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
this.context.setEntity(value);
}
@ -66,7 +66,7 @@ public final class RestfulWSClientMessageVisitor implements BinaryMessageVisitor
}
@Override
public Void setEvent(EventFormat format, byte[] value) throws MessageVisitException {
public Void setEvent(EventFormat format, byte[] value) throws CloudEventVisitException {
this.context.setEntity(value, null, MediaType.valueOf(format.serializedContentType()));
return null;
}

View File

@ -17,10 +17,10 @@
package io.cloudevents.http.restful.ws.impl;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.message.MessageVisitor;
import javax.ws.rs.core.HttpHeaders;
@ -28,7 +28,7 @@ import javax.ws.rs.core.MultivaluedMap;
import java.io.IOException;
import java.io.OutputStream;
public final class RestfulWSMessageVisitor implements BinaryMessageVisitor<Void>, MessageVisitor<RestfulWSMessageVisitor, Void> {
public final class RestfulWSMessageVisitor implements CloudEventVisitor<Void>, MessageVisitor<RestfulWSMessageVisitor, Void> {
private final MultivaluedMap<String, Object> httpHeaders;
private final OutputStream entityStream;
@ -42,27 +42,27 @@ public final class RestfulWSMessageVisitor implements BinaryMessageVisitor<Void>
}
@Override
public RestfulWSMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
public RestfulWSMessageVisitor create(SpecVersion version) {
this.httpHeaders.add(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
this.httpHeaders.add(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
this.httpHeaders.add(CloudEventsHeaders.CE_PREFIX + name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
try {
this.entityStream.write(value);
} catch (IOException e) {
throw MessageVisitException.newOther(e);
throw CloudEventVisitException.newOther(e);
}
}
@ -71,13 +71,13 @@ public final class RestfulWSMessageVisitor implements BinaryMessageVisitor<Void>
try {
this.entityStream.flush();
} catch (IOException e) {
throw MessageVisitException.newOther(e);
throw CloudEventVisitException.newOther(e);
}
return null;
}
@Override
public Void setEvent(EventFormat format, byte[] value) throws MessageVisitException {
public Void setEvent(EventFormat format, byte[] value) throws CloudEventVisitException {
this.httpHeaders.add(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
this.setBody(value);
return null;

View File

@ -17,6 +17,7 @@ Below is a sample on how to read and write CloudEvents:
```java
import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.cloudevents.message.StructuredMessage;
import io.cloudevents.CloudEvent;
import io.vertx.core.AbstractVerticle;
@ -30,9 +31,8 @@ public class CloudEventServerVerticle extends AbstractVerticle {
// If decoding succeeded, we should write the event back
if (result.succeeded()) {
CloudEvent event = result.result().toEvent();
// Echo the message, as binary mode
event
.asBinaryMessage()
// Echo the message, as structured mode
StructuredMessage.fromEvent(CSVFormat.INSTANCE, event)
.visit(VertxHttpServerResponseMessageVisitor.create(req.response()));
}
req.response().setStatusCode(500).end();
@ -55,9 +55,11 @@ public class CloudEventServerVerticle extends AbstractVerticle {
Below is a sample on how to use the client to send and receive a CloudEvent:
```java
import io.cloudevents.CloudEventBuilder;
import io.cloudevents.http.vertx.VertxHttpClientRequestMessageVisitor;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.cloudevents.CloudEvent;
import io.cloudevents.message.Message;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClient;
import io.vertx.core.AbstractVerticle;
@ -79,16 +81,15 @@ public class CloudEventClientVerticle extends AbstractVerticle {
});
});
CloudEvent event = CloudEvent.buildV1()
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))
.build();
// Write request as binary
event
.asBinaryMessage()
.visit(VertxHttpClientRequestMessageVisitor.create(request));
Message
.writeBinaryEvent(event, VertxHttpClientRequestMessageVisitor.create(request));
}
}
```

View File

@ -17,8 +17,8 @@
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.http.vertx.impl.VertxHttpClientRequestMessageVisitorImpl;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitor;
import io.vertx.core.http.HttpClientRequest;
@ -26,7 +26,7 @@ import io.vertx.core.http.HttpClientRequest;
* Visitor for {@link io.cloudevents.message.Message} that can write both structured and binary messages to a {@link HttpClientRequest}.
* When the visit ends, the request is ended with {@link HttpClientRequest#end(io.vertx.core.buffer.Buffer)}
*/
public interface VertxHttpClientRequestMessageVisitor extends MessageVisitor<VertxHttpClientRequestMessageVisitor, HttpClientRequest>, BinaryMessageVisitor<HttpClientRequest> {
public interface VertxHttpClientRequestMessageVisitor extends MessageVisitor<VertxHttpClientRequestMessageVisitor, HttpClientRequest>, CloudEventVisitor<HttpClientRequest> {
static VertxHttpClientRequestMessageVisitor create(HttpClientRequest req) {
return new VertxHttpClientRequestMessageVisitorImpl(req);

View File

@ -17,8 +17,8 @@
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.http.vertx.impl.VertxHttpServerResponseMessageVisitorImpl;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitor;
import io.vertx.core.http.HttpServerResponse;
@ -26,7 +26,7 @@ import io.vertx.core.http.HttpServerResponse;
* Visitor for {@link io.cloudevents.message.Message} that can write both structured and binary messages to a {@link HttpServerResponse}.
* When the visit ends, the request is ended with {@link HttpServerResponse#end(io.vertx.core.buffer.Buffer)}
*/
public interface VertxHttpServerResponseMessageVisitor extends MessageVisitor<VertxHttpServerResponseMessageVisitor, HttpServerResponse>, BinaryMessageVisitor<HttpServerResponse> {
public interface VertxHttpServerResponseMessageVisitor extends MessageVisitor<VertxHttpServerResponseMessageVisitor, HttpServerResponse>, CloudEventVisitor<HttpServerResponse> {
static VertxHttpServerResponseMessageVisitor create(HttpServerResponse res) {
return new VertxHttpServerResponseMessageVisitorImpl(res);

View File

@ -17,10 +17,10 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.http.vertx.VertxHttpClientRequestMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
@ -38,7 +38,7 @@ public class VertxHttpClientRequestMessageVisitorImpl implements VertxHttpClient
// Binary visitor factory
@Override
public VertxHttpClientRequestMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
public VertxHttpClientRequestMessageVisitor create(SpecVersion version) {
this.request.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
@ -46,19 +46,19 @@ public class VertxHttpClientRequestMessageVisitorImpl implements VertxHttpClient
// Binary visitor
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
this.request.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
this.request.putHeader("ce-" + name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
if (ended) {
throw MessageVisitException.newOther(new IllegalStateException("Cannot set the body because the request is already ended"));
throw CloudEventVisitException.newOther(new IllegalStateException("Cannot set the body because the request is already ended"));
}
this.request.end(Buffer.buffer(value));
this.ended = true;
@ -75,7 +75,7 @@ public class VertxHttpClientRequestMessageVisitorImpl implements VertxHttpClient
// Structured visitor
@Override
public HttpClientRequest setEvent(EventFormat format, byte[] value) throws MessageVisitException {
public HttpClientRequest setEvent(EventFormat format, byte[] value) throws CloudEventVisitException {
this.request.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
this.request.end(Buffer.buffer(value));
return this.request;

View File

@ -17,10 +17,10 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
@ -36,7 +36,7 @@ public class VertxHttpServerResponseMessageVisitorImpl implements VertxHttpServe
// Binary visitor factory
@Override
public VertxHttpServerResponseMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
public VertxHttpServerResponseMessageVisitor create(SpecVersion version) {
this.response.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
@ -44,19 +44,19 @@ public class VertxHttpServerResponseMessageVisitorImpl implements VertxHttpServe
// Binary visitor
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
this.response.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
this.response.putHeader("ce-" + name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
if (this.response.ended()) {
throw MessageVisitException.newOther(new IllegalStateException("Cannot set the body because the response is already ended"));
throw CloudEventVisitException.newOther(new IllegalStateException("Cannot set the body because the response is already ended"));
}
this.response.end(Buffer.buffer(value));
}
@ -72,7 +72,7 @@ public class VertxHttpServerResponseMessageVisitorImpl implements VertxHttpServe
// Structured visitor
@Override
public HttpServerResponse setEvent(EventFormat format, byte[] value) throws MessageVisitException {
public HttpServerResponse setEvent(EventFormat format, byte[] value) throws CloudEventVisitException {
this.response.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
this.response.end(Buffer.buffer(value));
return this.response;

View File

@ -19,6 +19,7 @@ package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.message.Message;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.types.Time;
import io.vertx.core.MultiMap;
@ -74,8 +75,8 @@ public class VertxHttpClientRequestMessageVisitorTest {
checkpoint.flag();
});
try {
event.asStructuredMessage(CSVFormat.INSTANCE)
.visit(VertxHttpClientRequestMessageVisitor.create(req));
Message
.writeStructuredEvent(event, CSVFormat.INSTANCE, VertxHttpClientRequestMessageVisitor.create(req));
} catch (Throwable e) {
testContext.failNow(e);
}
@ -116,8 +117,8 @@ public class VertxHttpClientRequestMessageVisitorTest {
checkpoint.flag();
});
try {
event.asBinaryMessage()
.visit(VertxHttpClientRequestMessageVisitor.create(req));
Message
.writeBinaryEvent(event, VertxHttpClientRequestMessageVisitor.create(req));
} catch (Throwable e) {
testContext.failNow(e);
}

View File

@ -19,6 +19,8 @@ package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.message.Message;
import io.cloudevents.message.impl.GenericStructuredMessage;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.types.Time;
import io.vertx.core.MultiMap;
@ -50,7 +52,7 @@ public class VertxHttpServerResponseMessageVisitorTest {
.createHttpServer()
.requestHandler(httpServerRequest -> {
try {
event.asStructuredMessage(CSVFormat.INSTANCE).visit(
GenericStructuredMessage.fromEvent(CSVFormat.INSTANCE, event).visit(
VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response())
);
checkpoint.flag();
@ -89,9 +91,8 @@ public class VertxHttpServerResponseMessageVisitorTest {
.createHttpServer()
.requestHandler(httpServerRequest -> {
try {
event.asBinaryMessage().visit(
VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response())
);
Message
.writeBinaryEvent(event, VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response()));
checkpoint.flag();
} catch (Throwable e) {
testContext.failNow(e);

View File

@ -18,7 +18,7 @@ To produce CloudEvents in Kafka, configure the KafkaProducer to use the provided
import java.util.Properties;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventSerializer;
import io.cloudevents.CloudEventBuilder;import io.cloudevents.kafka.CloudEventSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
@ -36,7 +36,7 @@ public class CloudEventProducer {
try(KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(props)){
// Build an event
CloudEvent event = CloudEvent.buildV1()
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
.withType("example.kafka")
.withSource(URI.create("http://localhost"))

View File

@ -22,6 +22,7 @@ import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventFormatProvider;
import io.cloudevents.kafka.impl.KafkaSerializerMessageVisitorImpl;
import io.cloudevents.message.Encoding;
import io.cloudevents.message.Message;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
@ -83,9 +84,9 @@ public class CloudEventSerializer implements Serializer<CloudEvent> {
@Override
public byte[] serialize(String topic, Headers headers, CloudEvent data) {
if (encoding == Encoding.STRUCTURED) {
return data.asStructuredMessage(this.format).visit(new KafkaSerializerMessageVisitorImpl(headers));
return Message.writeStructuredEvent(data, this.format, new KafkaSerializerMessageVisitorImpl(headers));
} else {
return data.asBinaryMessage().visit(new KafkaSerializerMessageVisitorImpl(headers));
return Message.writeBinaryEvent(data, new KafkaSerializerMessageVisitorImpl(headers));
}
}
}

View File

@ -17,12 +17,12 @@
package io.cloudevents.kafka;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.kafka.impl.KafkaProducerMessageVisitorImpl;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitor;
import org.apache.kafka.clients.producer.ProducerRecord;
public interface KafkaProducerMessageVisitor<K> extends MessageVisitor<KafkaProducerMessageVisitor<K>, ProducerRecord<K, byte[]>>, BinaryMessageVisitor<ProducerRecord<K, byte[]>> {
public interface KafkaProducerMessageVisitor<K> extends MessageVisitor<KafkaProducerMessageVisitor<K>, ProducerRecord<K, byte[]>>, CloudEventVisitor<ProducerRecord<K, byte[]>> {
static <V> KafkaProducerMessageVisitor<V> create(String topic, Integer partition, Long timestamp, V key) {
return new KafkaProducerMessageVisitorImpl<V>(topic, partition, timestamp, key);

View File

@ -17,14 +17,14 @@
package io.cloudevents.kafka.impl;
import io.cloudevents.CloudEventVisitException;
import io.cloudevents.CloudEventVisitor;
import io.cloudevents.format.EventFormat;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.cloudevents.message.MessageVisitor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
abstract class BaseKafkaMessageVisitorImpl<S extends MessageVisitor<S, R> & BinaryMessageVisitor<R>, R> implements MessageVisitor<S, R>, BinaryMessageVisitor<R> {
abstract class BaseKafkaMessageVisitorImpl<S extends MessageVisitor<S, R> & CloudEventVisitor<R>, R> implements MessageVisitor<S, R>, CloudEventVisitor<R> {
byte[] value;
final Headers headers;
@ -34,22 +34,22 @@ abstract class BaseKafkaMessageVisitorImpl<S extends MessageVisitor<S, R> & Bina
}
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
public void setAttribute(String name, String value) throws CloudEventVisitException {
headers.add(new RecordHeader(KafkaHeaders.ATTRIBUTES_TO_HEADERS.get(name), value.getBytes()));
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
public void setExtension(String name, String value) throws CloudEventVisitException {
headers.add(new RecordHeader(KafkaHeaders.CE_PREFIX + name, value.getBytes()));
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
public void setBody(byte[] value) throws CloudEventVisitException {
this.value = value;
}
@Override
public R setEvent(EventFormat format, byte[] value) throws MessageVisitException {
public R setEvent(EventFormat format, byte[] value) throws CloudEventVisitException {
this.headers.add(new RecordHeader(KafkaHeaders.CONTENT_TYPE, format.serializedContentType().getBytes()));
this.value = value;
return this.end();

View File

@ -45,7 +45,7 @@ public final class KafkaProducerMessageVisitorImpl<K> extends
}
@Override
public KafkaProducerMessageVisitor<K> createBinaryMessageVisitor(SpecVersion version) {
public KafkaProducerMessageVisitor<K> create(SpecVersion version) {
this.setAttribute("specversion", version.toString());
return this;
}

View File

@ -27,7 +27,7 @@ public final class KafkaSerializerMessageVisitorImpl extends BaseKafkaMessageVis
}
@Override
public KafkaSerializerMessageVisitorImpl createBinaryMessageVisitor(SpecVersion version) {
public KafkaSerializerMessageVisitorImpl create(SpecVersion version) {
this.setAttribute("specversion", version.toString());
return this;
}

View File

@ -18,6 +18,7 @@
package io.cloudevents.kafka;
import io.cloudevents.CloudEvent;
import io.cloudevents.message.Message;
import io.cloudevents.test.Data;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
@ -34,7 +35,7 @@ public class CloudEventDeserializerTest {
CloudEventDeserializer deserializer = new CloudEventDeserializer();
// Serialize the event first
ProducerRecord<Void, byte[]> inRecord = inEvent.asBinaryMessage().visit(KafkaProducerMessageVisitor.create(topic));
ProducerRecord<Void, byte[]> inRecord = Message.writeBinaryEvent(inEvent, KafkaProducerMessageVisitor.create(topic));
CloudEvent outEvent = deserializer.deserialize(topic, inRecord.headers(), inRecord.value());
assertThat(outEvent)

View File

@ -36,7 +36,7 @@ public class CloudEventMessageDeserializerTest {
CloudEventMessageDeserializer deserializer = new CloudEventMessageDeserializer();
// Serialize the event first
ProducerRecord<Void, byte[]> inRecord = inEvent.asBinaryMessage().visit(KafkaProducerMessageVisitor.create(topic));
ProducerRecord<Void, byte[]> inRecord = Message.writeBinaryEvent(inEvent, KafkaProducerMessageVisitor.create(topic));
Message outMessage = deserializer.deserialize(topic, inRecord.headers(), inRecord.value());
assertThat(outMessage.getEncoding())

View File

@ -40,7 +40,7 @@ public class CloudEventMessageSerializerTest {
Headers headers = new RecordHeaders();
MockBinaryMessage inMessage = new MockBinaryMessage();
event.asBinaryMessage().visit(inMessage);
event.visit(inMessage);
byte[] payload = serializer.serialize(topic, headers, inMessage);

View File

@ -20,6 +20,8 @@ package io.cloudevents.kafka;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.kafka.impl.KafkaHeaders;
import io.cloudevents.message.Message;
import io.cloudevents.message.StructuredMessage;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.types.Time;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -49,8 +51,8 @@ public class KafkaProducerMessageVisitorTest {
Long timestamp = System.currentTimeMillis();
String key = "aaa";
ProducerRecord<String, byte[]> producerRecord = event
.asStructuredMessage(CSVFormat.INSTANCE)
ProducerRecord<String, byte[]> producerRecord = StructuredMessage
.fromEvent(CSVFormat.INSTANCE, event)
.visit(KafkaProducerMessageVisitor.create(topic, partition, timestamp, key));
assertThat(producerRecord.topic())
@ -75,9 +77,8 @@ public class KafkaProducerMessageVisitorTest {
Long timestamp = System.currentTimeMillis();
String key = "aaa";
ProducerRecord<String, byte[]> producerRecord = event
.asBinaryMessage()
.visit(KafkaProducerMessageVisitor.create(topic, partition, timestamp, key));
ProducerRecord<String, byte[]> producerRecord = Message
.writeBinaryEvent(event, KafkaProducerMessageVisitor.create(topic, partition, timestamp, key));
assertThat(producerRecord.topic())
.isEqualTo(topic);