diff --git a/README.md b/README.md
index 6b0ea8b6..0932cf77 100644
--- a/README.md
+++ b/README.md
@@ -17,10 +17,21 @@ If you want to know more about v1 of this SDK, check out the [v1 readme](./READM
Stay tuned!
-This SDK current supports the following versions of CloudEvents:
+Supported features of the specification:
-- 0.3
-- 1.0
+| | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
+| :---------------------------: | :----------------------------------------------------------------------------: | :---------------------------------------------------------------------------------: |
+| CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: |
+| AMQP Protocol Binding | :x: | :x: |
+| AVRO Event Format | :x: | :x: |
+| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
+| - [Vert.x](http/vertx) | :heavy_check_mark: | :heavy_check_mark: |
+| JSON Event Format | :heavy_check_mark: | :heavy_check_mark: |
+| - [Jackson](formats/json-jackson) | :heavy_check_mark: | :heavy_check_mark: |
+| Kafka Protocol Binding | :x: | :x: |
+| MQTT Protocol Binding | :x: | :x: |
+| NATS Protocol Binding | :x: | :x: |
+| Web hook | :x: | :x: |
## Motivation
diff --git a/api/pom.xml b/api/pom.xml
index 8d425e1d..e4d5831e 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -34,14 +34,14 @@
org.junit.jupiter
junit-jupiter
- 5.4.2
+ ${junit-jupiter.version}
test
org.assertj
assertj-core
- 3.10.0
+ ${assertj-core.version}
test
@@ -52,7 +52,7 @@
org.apache.maven.plugins
maven-surefire-plugin
- 3.0.0-M4
+ ${maven-surefire-plugin.version}
org.apache.maven.plugins
diff --git a/api/src/main/java/io/cloudevents/SpecVersion.java b/api/src/main/java/io/cloudevents/SpecVersion.java
index 411de8ef..78ce2997 100644
--- a/api/src/main/java/io/cloudevents/SpecVersion.java
+++ b/api/src/main/java/io/cloudevents/SpecVersion.java
@@ -4,6 +4,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public enum SpecVersion {
V03(
@@ -20,11 +22,15 @@ public enum SpecVersion {
private final String stringValue;
private final Set mandatoryAttributes;
private final Set optionalAttributes;
+ private final Set allAttributes;
SpecVersion(String stringValue, Set mandatoryAttributes, Set optionalAttributes) {
this.stringValue = stringValue;
this.mandatoryAttributes = mandatoryAttributes;
this.optionalAttributes = optionalAttributes;
+ this.allAttributes = Collections.unmodifiableSet(
+ Stream.concat(mandatoryAttributes.stream(), optionalAttributes.stream()).collect(Collectors.toSet())
+ );
}
@Override
@@ -43,11 +49,24 @@ public enum SpecVersion {
}
}
+ /**
+ * @return mandatory attributes of the spec version
+ */
public Set getMandatoryAttributes() {
return mandatoryAttributes;
}
+ /**
+ * @return optional attributes of the spec version
+ */
public Set getOptionalAttributes() {
return optionalAttributes;
}
+
+ /**
+ * @return all attributes for this spec
+ */
+ public Set getAllAttributes() {
+ return allAttributes;
+ }
}
diff --git a/api/src/main/java/io/cloudevents/format/EventFormat.java b/api/src/main/java/io/cloudevents/format/EventFormat.java
index 9e118d23..e86c6439 100644
--- a/api/src/main/java/io/cloudevents/format/EventFormat.java
+++ b/api/src/main/java/io/cloudevents/format/EventFormat.java
@@ -2,6 +2,7 @@ package io.cloudevents.format;
import io.cloudevents.CloudEvent;
+import java.util.Collections;
import java.util.Set;
public interface EventFormat {
@@ -10,6 +11,10 @@ public interface EventFormat {
CloudEvent deserialize(byte[] event) throws EventDeserializationException;
- Set supportedContentTypes();
+ default Set deserializableContentTypes() {
+ return Collections.singleton(serializedContentType());
+ }
+
+ String serializedContentType();
}
diff --git a/api/src/main/java/io/cloudevents/format/EventFormatProvider.java b/api/src/main/java/io/cloudevents/format/EventFormatProvider.java
index 0c9f9bcb..cff2876a 100644
--- a/api/src/main/java/io/cloudevents/format/EventFormatProvider.java
+++ b/api/src/main/java/io/cloudevents/format/EventFormatProvider.java
@@ -26,12 +26,16 @@ public final class EventFormatProvider {
}
public void registerFormat(EventFormat format) {
- for (String k: format.supportedContentTypes()) {
+ for (String k : format.deserializableContentTypes()) {
this.formats.put(k, format);
}
}
public EventFormat resolveFormat(String key) {
+ int i = key.indexOf(';');
+ if (i != -1) {
+ key = key.substring(0, i);
+ }
return this.formats.get(key);
}
diff --git a/api/src/main/java/io/cloudevents/impl/CloudEventImpl.java b/api/src/main/java/io/cloudevents/impl/CloudEventImpl.java
index 5d29d987..8fb442db 100644
--- a/api/src/main/java/io/cloudevents/impl/CloudEventImpl.java
+++ b/api/src/main/java/io/cloudevents/impl/CloudEventImpl.java
@@ -120,7 +120,7 @@ public final class CloudEventImpl implements CloudEvent, BinaryMessage, BinaryMe
public String toString() {
return "CloudEvent{" +
"attributes=" + attributes +
- ", data=" + new String(this.data, StandardCharsets.UTF_8) +
+ ((this.data != null) ? ", data=" + new String(this.data, StandardCharsets.UTF_8) : "") +
", extensions=" + extensions +
'}';
}
diff --git a/api/src/main/java/io/cloudevents/message/BinaryMessage.java b/api/src/main/java/io/cloudevents/message/BinaryMessage.java
index ff0eef1c..3a4f0ece 100644
--- a/api/src/main/java/io/cloudevents/message/BinaryMessage.java
+++ b/api/src/main/java/io/cloudevents/message/BinaryMessage.java
@@ -6,12 +6,11 @@ import io.cloudevents.CloudEvent;
public interface BinaryMessage {
/**
- *
- * @param visitor
+ * @param visitorFactory
* @throws MessageVisitException
* @throws IllegalStateException If the message is not a valid binary message
*/
- , V> V visit(BinaryMessageVisitorFactory visitor) throws MessageVisitException, IllegalStateException;
+ , R> R visit(BinaryMessageVisitorFactory visitorFactory) throws MessageVisitException, IllegalStateException;
default CloudEvent toEvent() throws MessageVisitException, IllegalStateException {
return this.visit(specVersion -> {
diff --git a/api/src/main/java/io/cloudevents/message/BinaryMessageAttributesVisitor.java b/api/src/main/java/io/cloudevents/message/BinaryMessageAttributesVisitor.java
index eb958e86..9fb508b9 100644
--- a/api/src/main/java/io/cloudevents/message/BinaryMessageAttributesVisitor.java
+++ b/api/src/main/java/io/cloudevents/message/BinaryMessageAttributesVisitor.java
@@ -8,12 +8,34 @@ import java.time.ZonedDateTime;
@FunctionalInterface
public interface BinaryMessageAttributesVisitor {
+ /**
+ * Set attribute with type {@link String}. This setter should not be invoked for specversion, because the built Visitor already
+ * has the information through the {@link BinaryMessageVisitorFactory}
+ *
+ * @param name
+ * @param value
+ * @throws MessageVisitException
+ */
void setAttribute(String name, String value) throws MessageVisitException;
+ /**
+ * Set attribute with type {@link URI}.
+ *
+ * @param name
+ * @param value
+ * @throws MessageVisitException
+ */
default void setAttribute(String name, URI value) throws MessageVisitException {
setAttribute(name, value.toString());
}
+ /**
+ * Set attribute with type {@link ZonedDateTime} attribute.
+ *
+ * @param name
+ * @param value
+ * @throws MessageVisitException
+ */
default void setAttribute(String name, ZonedDateTime value) throws MessageVisitException {
setAttribute(name, value.format(Time.RFC3339_DATE_FORMAT));
}
diff --git a/api/src/main/java/io/cloudevents/message/BinaryMessageVisitorFactory.java b/api/src/main/java/io/cloudevents/message/BinaryMessageVisitorFactory.java
index f4fb81ea..13047c1d 100644
--- a/api/src/main/java/io/cloudevents/message/BinaryMessageVisitorFactory.java
+++ b/api/src/main/java/io/cloudevents/message/BinaryMessageVisitorFactory.java
@@ -3,6 +3,6 @@ package io.cloudevents.message;
import io.cloudevents.SpecVersion;
@FunctionalInterface
-public interface BinaryMessageVisitorFactory, V> {
- T createBinaryMessageVisitor(SpecVersion version);
+public interface BinaryMessageVisitorFactory, R> {
+ V createBinaryMessageVisitor(SpecVersion version);
}
diff --git a/api/src/main/java/io/cloudevents/message/MessageVisitException.java b/api/src/main/java/io/cloudevents/message/MessageVisitException.java
index cb32bc0f..402b7ef1 100644
--- a/api/src/main/java/io/cloudevents/message/MessageVisitException.java
+++ b/api/src/main/java/io/cloudevents/message/MessageVisitException.java
@@ -8,10 +8,16 @@ public class MessageVisitException extends RuntimeException {
INVALID_ATTRIBUTE_TYPE,
INVALID_ATTRIBUTE_VALUE,
INVALID_EXTENSION_TYPE,
+ OTHER
}
private MessageVisitExceptionKind kind;
+ public MessageVisitException(MessageVisitExceptionKind kind, Throwable cause) {
+ super(cause);
+ this.kind = kind;
+ }
+
public MessageVisitException(MessageVisitExceptionKind kind, String message) {
super(message);
this.kind = kind;
@@ -61,4 +67,11 @@ public class MessageVisitException extends RuntimeException {
"Invalid extension type for \"" + extensionName + "\": " + clazz.getCanonicalName()
);
}
+
+ public static MessageVisitException newOther(Throwable cause) {
+ return new MessageVisitException(
+ MessageVisitExceptionKind.OTHER,
+ cause
+ );
+ }
}
diff --git a/api/src/main/java/io/cloudevents/message/impl/UnknownEncodingMessage.java b/api/src/main/java/io/cloudevents/message/impl/UnknownEncodingMessage.java
new file mode 100644
index 00000000..9d01891d
--- /dev/null
+++ b/api/src/main/java/io/cloudevents/message/impl/UnknownEncodingMessage.java
@@ -0,0 +1,20 @@
+package io.cloudevents.message.impl;
+
+import io.cloudevents.message.*;
+
+public class UnknownEncodingMessage implements Message {
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.UNKNOWN;
+ }
+
+ @Override
+ public , V> V visit(BinaryMessageVisitorFactory visitorFactory) throws MessageVisitException, IllegalStateException {
+ throw Encoding.WRONG_ENCODING_EXCEPTION;
+ }
+
+ @Override
+ public T visit(StructuredMessageVisitor visitor) throws MessageVisitException, IllegalStateException {
+ throw Encoding.WRONG_ENCODING_EXCEPTION;
+ }
+}
diff --git a/api/src/test/java/io/cloudevents/format/EventFormatProviderTest.java b/api/src/test/java/io/cloudevents/format/EventFormatProviderTest.java
index af6f648d..a8e64c21 100644
--- a/api/src/test/java/io/cloudevents/format/EventFormatProviderTest.java
+++ b/api/src/test/java/io/cloudevents/format/EventFormatProviderTest.java
@@ -9,7 +9,13 @@ public class EventFormatProviderTest {
@Test
void resolveCSV() {
- assertThat(EventFormatProvider.getInstance().resolveFormat("text/csv"))
+ assertThat(EventFormatProvider.getInstance().resolveFormat(CSVFormat.INSTANCE.serializedContentType()))
+ .isInstanceOf(CSVFormat.class);
+ }
+
+ @Test
+ void resolveCSVWithParams() {
+ assertThat(EventFormatProvider.getInstance().resolveFormat(CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"))
.isInstanceOf(CSVFormat.class);
}
diff --git a/api/src/test/java/io/cloudevents/mock/CSVFormat.java b/api/src/test/java/io/cloudevents/mock/CSVFormat.java
index dc176b26..924a6eaf 100644
--- a/api/src/test/java/io/cloudevents/mock/CSVFormat.java
+++ b/api/src/test/java/io/cloudevents/mock/CSVFormat.java
@@ -69,14 +69,21 @@ public class CSVFormat implements EventFormat {
builder.withData(data);
}
switch (sv) {
- case V03: return builder.build().toV03();
- case V1: return builder.build().toV1();
+ case V03:
+ return builder.build().toV03();
+ case V1:
+ return builder.build().toV1();
}
return null;
}
@Override
- public Set supportedContentTypes() {
- return Collections.singleton("text/csv");
+ public Set deserializableContentTypes() {
+ return Collections.singleton(serializedContentType());
+ }
+
+ @Override
+ public String serializedContentType() {
+ return "application/cloudevents+csv";
}
}
diff --git a/api/src/test/java/io/cloudevents/mock/MockStructuredMessage.java b/api/src/test/java/io/cloudevents/mock/MockStructuredMessage.java
index ce7e6bd0..8562d4c3 100644
--- a/api/src/test/java/io/cloudevents/mock/MockStructuredMessage.java
+++ b/api/src/test/java/io/cloudevents/mock/MockStructuredMessage.java
@@ -14,7 +14,7 @@ public class MockStructuredMessage implements Message, StructuredMessageVisitor<
}
@Override
- public , V> V visit(BinaryMessageVisitorFactory visitor) throws MessageVisitException, IllegalStateException {
+ public , V> V visit(BinaryMessageVisitorFactory visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
diff --git a/api/src/test/java/io/cloudevents/test/Data.java b/api/src/test/java/io/cloudevents/test/Data.java
index f1258723..5cf1b3d0 100644
--- a/api/src/test/java/io/cloudevents/test/Data.java
+++ b/api/src/test/java/io/cloudevents/test/Data.java
@@ -50,6 +50,18 @@ public class Data {
.withExtension("anumber", 10)
.build();
+ public static final CloudEvent V1_WITH_JSON_DATA_WITH_EXT_STRING = CloudEvent.buildV1()
+ .withId(ID)
+ .withType(TYPE)
+ .withSource(SOURCE)
+ .withData(DATACONTENTTYPE_JSON, DATASCHEMA, DATA_JSON_SERIALIZED)
+ .withSubject(SUBJECT)
+ .withTime(TIME)
+ .withExtension("astring", "aaa")
+ .withExtension("aboolean", "true")
+ .withExtension("anumber", "10")
+ .build();
+
public static final CloudEvent V1_WITH_XML_DATA = CloudEvent.buildV1()
.withId(ID)
.withType(TYPE)
@@ -71,6 +83,7 @@ public class Data {
public static final CloudEvent V03_MIN = V1_MIN.toV03();
public static final CloudEvent V03_WITH_JSON_DATA = V1_WITH_JSON_DATA.toV03();
public static final CloudEvent V03_WITH_JSON_DATA_WITH_EXT = V1_WITH_JSON_DATA_WITH_EXT.toV03();
+ public static final CloudEvent V03_WITH_JSON_DATA_WITH_EXT_STRING = V1_WITH_JSON_DATA_WITH_EXT_STRING.toV03();
public static final CloudEvent V03_WITH_XML_DATA = V1_WITH_XML_DATA.toV03();
public static final CloudEvent V03_WITH_TEXT_DATA = V1_WITH_TEXT_DATA.toV03();
@@ -82,6 +95,10 @@ public class Data {
return Stream.concat(v1Events(), v03Events()).filter(e -> e.getExtensions().isEmpty());
}
+ public static Stream allEventsWithStringExtensions() {
+ return Stream.concat(v1EventsWithStringExt(), v03EventsWithStringExt());
+ }
+
public static Stream v1Events() {
return Stream.of(
Data.V1_MIN,
@@ -102,4 +119,20 @@ public class Data {
);
}
+ public static Stream v1EventsWithStringExt() {
+ return v1Events().map(ce -> {
+ io.cloudevents.v1.CloudEventBuilder builder = CloudEvent.buildV1(ce);
+ ce.getExtensions().forEach((k, v) -> builder.withExtension(k, v.toString()));
+ return builder.build();
+ });
+ }
+
+ public static Stream v03EventsWithStringExt() {
+ return v03Events().map(ce -> {
+ io.cloudevents.v03.CloudEventBuilder builder = CloudEvent.buildV03(ce);
+ ce.getExtensions().forEach((k, v) -> builder.withExtension(k, v.toString()));
+ return builder.build();
+ });
+ }
+
}
diff --git a/formats/json-jackson/README.md b/formats/json-jackson/README.md
new file mode 100644
index 00000000..045b6cfb
--- /dev/null
+++ b/formats/json-jackson/README.md
@@ -0,0 +1,30 @@
+# Json EventFormat implementation with Jackson
+
+Implementation of [`EventFormat`](../../api/src/main/java/io/cloudevents/format/EventFormat.java) using Jackson.
+
+For Maven:
+
+```xml
+
+ io.cloudevents
+ json-jackson
+ 2.0.0-SNAPSHOT
+
+```
+
+## Usage
+
+You don't need to perform any operation to configure the module, more than adding the dependency to your project:
+
+```java
+CloudEvent event = CloudEvent.buildV1()
+ .withId("hello")
+ .withType("example.vertx")
+ .withSource(URI.create("http://localhost"))
+ .build();
+
+byte[] serialized = EventFormatProvider
+ .getInstance()
+ .resolveFormat("application/json")
+ .serialize(event);
+```
diff --git a/formats/json-jackson/pom.xml b/formats/json-jackson/pom.xml
index 896eae86..772c9939 100644
--- a/formats/json-jackson/pom.xml
+++ b/formats/json-jackson/pom.xml
@@ -68,14 +68,14 @@
org.junit.jupiter
junit-jupiter
- 5.4.2
+ ${junit-jupiter.version}
test
org.assertj
assertj-core
- 3.10.0
+ ${assertj-core.version}
test
@@ -95,7 +95,7 @@
org.apache.maven.plugins
maven-surefire-plugin
- 3.0.0-M4
+ ${maven-surefire-plugin.version}
diff --git a/formats/json-jackson/src/main/java/io/cloudevents/format/json/JsonFormat.java b/formats/json-jackson/src/main/java/io/cloudevents/format/json/JsonFormat.java
index 6322b31e..e6601c0a 100644
--- a/formats/json-jackson/src/main/java/io/cloudevents/format/json/JsonFormat.java
+++ b/formats/json-jackson/src/main/java/io/cloudevents/format/json/JsonFormat.java
@@ -24,8 +24,6 @@ import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventSerializationException;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
public final class JsonFormat implements EventFormat {
@@ -79,8 +77,8 @@ public final class JsonFormat implements EventFormat {
}
@Override
- public Set supportedContentTypes() {
- return Collections.singleton(CONTENT_TYPE);
+ public String serializedContentType() {
+ return CONTENT_TYPE;
}
/**
diff --git a/formats/json-jackson/src/test/java/io/cloudevents/format/json/JsonFormatTest.java b/formats/json-jackson/src/test/java/io/cloudevents/format/json/JsonFormatTest.java
index 6da34a85..0a4d65b0 100644
--- a/formats/json-jackson/src/test/java/io/cloudevents/format/json/JsonFormatTest.java
+++ b/formats/json-jackson/src/test/java/io/cloudevents/format/json/JsonFormatTest.java
@@ -89,7 +89,7 @@ class JsonFormatTest {
assertThat(output).isEqualTo(input);
}
- private static Stream serializeTestArgumentsDefault() {
+ public static Stream serializeTestArgumentsDefault() {
return Stream.of(
Arguments.of(V03_MIN, "v03/min.json"),
Arguments.of(V03_WITH_JSON_DATA, "v03/json_data.json"),
@@ -104,7 +104,7 @@ class JsonFormatTest {
);
}
- private static Stream serializeTestArgumentsString() {
+ public static Stream serializeTestArgumentsString() {
return Stream.of(
Arguments.of(V03_WITH_JSON_DATA, "v03/json_data.json"),
Arguments.of(V03_WITH_JSON_DATA_WITH_EXT, "v03/json_data_with_ext.json"),
@@ -117,7 +117,7 @@ class JsonFormatTest {
);
}
- private static Stream serializeTestArgumentsBase64() {
+ public static Stream serializeTestArgumentsBase64() {
return Stream.of(
Arguments.of(V03_WITH_JSON_DATA, "v03/base64_json_data.json"),
Arguments.of(V03_WITH_JSON_DATA_WITH_EXT, "v03/base64_json_data_with_ext.json"),
@@ -130,7 +130,7 @@ class JsonFormatTest {
);
}
- private static Stream deserializeTestArguments() {
+ public static Stream deserializeTestArguments() {
return Stream.of(
Arguments.of("v03/min.json", V03_MIN),
Arguments.of("v03/json_data.json", V03_WITH_JSON_DATA),
@@ -153,7 +153,7 @@ class JsonFormatTest {
);
}
- private static Stream roundTripTestArguments() {
+ public static Stream roundTripTestArguments() {
return Stream.of(
"v03/min.json",
"v03/json_data.json",
@@ -167,7 +167,7 @@ class JsonFormatTest {
);
}
- private static byte[] loadFile(String input) {
+ public static byte[] loadFile(String input) {
try {
return String.join(
"",
diff --git a/http/vertx/README.md b/http/vertx/README.md
index 4fff21df..82210196 100644
--- a/http/vertx/README.md
+++ b/http/vertx/README.md
@@ -1,6 +1,4 @@
-# HTTP Transport Util for Eclipse Vert.x
-
-## Receiving CloudEvents
+# HTTP Transport for Eclipse Vert.x
For Maven based projects, use the following to configure the CloudEvents Vertx HTTP Transport:
@@ -8,54 +6,91 @@ For Maven based projects, use the following to configure the CloudEvents Vertx H
io.cloudevents
http-vertx
- 1.3.0
+ 2.0.0-SNAPSHOT
```
-Below is a sample on how to use [Vert.x API for RxJava 2](https://vertx.io/docs/vertx-rx/java2/) for reading CloudEvents from an HttpServerRequest:
+## Receiving CloudEvents
+
+Below is a sample on how to read and write CloudEvents:
```java
-import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
+import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
+import io.cloudevents.http.vertx.VertxMessage;
+import io.cloudevents.CloudEvent;
import io.vertx.core.http.HttpHeaders;
-import io.vertx.reactivex.core.AbstractVerticle;
+import io.vertx.core.AbstractVerticle;
-public class CloudEventVerticle extends AbstractVerticle {
+public class CloudEventServerVerticle extends AbstractVerticle {
public void start() {
-
vertx.createHttpServer()
- .requestHandler(req -> {
- VertxCloudEvents.create().rxReadFromRequest(req)
- .subscribe((receivedEvent, throwable) -> {
- if (receivedEvent != null) {
- // I got a CloudEvent object:
- System.out.println("The event type: " + receivedEvent.getEventType());
+ .requestHandler(req -> {
+ VertxMessage.fromHttpServerRequest(req)
+ .onComplete(result -> {
+ // If decoding succeeded, we should write the event back
+ if (result.succeeded()) {
+ CloudEvent event = result.result().toEvent();
+ // Echo the message, as binary mode
+ event
+ .asBinaryMessage()
+ .visit(VertxHttpServerResponseMessageVisitor.create(req.response()));
}
+ req.response().setStatusCode(500).end();
});
- req.response().end();
+ })
+ .listen(8080, serverResult -> {
+ if (serverResult.succeeded()) {
+ System.out.println("Server started on port " + serverResult.result().actualPort());
+ } else {
+ System.out.println("Error starting the server");
+ serverResult.cause().printStackTrace();
}
- )
- .rxListen(8080)
- .subscribe(server -> {
- System.out.println("Server running!");
- });
+ });
}
}
```
## Sending CloudEvents
-Below is a sample on how to use the client to send a CloudEvent:
+Below is a sample on how to use the client to send and receive a CloudEvent:
```java
-final HttpClientRequest request = vertx.createHttpClient().post(8080, "localhost", "/");
+import io.cloudevents.http.vertx.VertxHttpClientRequestMessageVisitor;import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
+import io.cloudevents.http.vertx.VertxMessage;
+import io.cloudevents.CloudEvent;
+import io.vertx.core.http.HttpClientRequest;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.AbstractVerticle;
+import java.net.URI;
-// add a client response handler
-request.handler(resp -> {
- // react on the server response
-});
+public class CloudEventClientVerticle extends AbstractVerticle {
-// write the CloudEvent to the given HTTP Post request object
-VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, request);
-request.end();
+ public void start() {
+ HttpClient client = vertx.createHttpClient();
+
+ HttpClientRequest request = client.postAbs("http://localhost:8080")
+ .handler(httpClientResponse -> {
+ VertxMessage
+ .fromHttpClientResponse(httpClientResponse)
+ .onComplete(result -> {
+ if (result.succeeded()) {
+ CloudEvent event = result.result().toEvent();
+ }
+ });
+ });
+
+ CloudEvent event = CloudEvent.buildV1()
+ .withId("hello")
+ .withType("example.vertx")
+ .withSource(URI.create("http://localhost"))
+ .build();
+
+ // Write request as binary
+ event
+ .asBinaryMessage()
+ .visit(VertxHttpClientRequestMessageVisitor.create(request));
+ }
+}
```
diff --git a/http/vertx/pom.xml b/http/vertx/pom.xml
index 3d84cbfc..6987f5d5 100644
--- a/http/vertx/pom.xml
+++ b/http/vertx/pom.xml
@@ -20,45 +20,45 @@
io.cloudevents
cloudevents-parent
- 1.3.0
- ../..
+ 2.0.0-SNAPSHOT
+ ../../
http-vertx
- CloudEvents - vertx-transport-http
+ CloudEvents - Vert.x Http Integration
jar
-
+
+ 3.9.0
+ 3.11.0
+
+
io.cloudevents
cloudevents-api
${project.version}
-
io.vertx
vertx-core
${vertx.version}
+
- io.vertx
- vertx-codegen
- ${vertx.version}
- true
+ io.cloudevents
+ cloudevents-api
+ tests
+ test-jar
+ ${project.version}
+ test
-
- io.vertx
- vertx-rx-java2
- ${vertx.version}
- true
-
-
io.vertx
vertx-junit5
${vertx.version}
+ test
org.assertj
@@ -66,54 +66,22 @@
${assertj-core.version}
test
-
org.junit.jupiter
- junit-jupiter-api
+ junit-jupiter
${junit-jupiter.version}
test
-
- org.junit.jupiter
- junit-jupiter-engine
- ${junit-jupiter.version}
- test
-
-
-
-
-
- maven-compiler-plugin
-
-
- default-compile
-
-
- io.vertx.codegen.CodeGenProcessor
-
-
- -AoutputDirectory=${project.basedir}/src/main
-
-
-
-
-
-
- maven-surefire-plugin
- ${maven-surefire-plugin.version}
-
-
-
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+
-
- 3.6.0
- 2.22.0
- 3.11.0
- 5.3.2
-
-
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java
deleted file mode 100644
index 4de6938e..00000000
--- a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Copyright 2018 The CloudEvents Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.cloudevents.http.vertx;
-
-import io.cloudevents.CloudEvent;
-import io.cloudevents.http.vertx.impl.VertxCloudEventsImpl;
-import io.cloudevents.v02.AttributesImpl;
-import io.vertx.codegen.annotations.GenIgnore;
-import io.vertx.codegen.annotations.VertxGen;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
-import io.vertx.core.http.HttpClientRequest;
-import io.vertx.core.http.HttpServerRequest;
-
-@VertxGen
-public interface VertxCloudEvents {
-
- static VertxCloudEvents create() {
- return new VertxCloudEventsImpl();
- }
-
- @GenIgnore(GenIgnore.PERMITTED_TYPE)
- void readFromRequest(HttpServerRequest request, Handler>> resultHandler);
-
- @GenIgnore(GenIgnore.PERMITTED_TYPE)
- void readFromRequest(HttpServerRequest request, Class[] extensions, Handler>> resultHandler);
-
- @GenIgnore(GenIgnore.PERMITTED_TYPE)
- void writeToHttpClientRequest(CloudEvent ce, HttpClientRequest request);
-
- @GenIgnore(GenIgnore.PERMITTED_TYPE)
- void writeToHttpClientRequest(CloudEvent ce, boolean binary, HttpClientRequest request);
-}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxHttpClientRequestMessageVisitor.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxHttpClientRequestMessageVisitor.java
new file mode 100644
index 00000000..b7559238
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxHttpClientRequestMessageVisitor.java
@@ -0,0 +1,18 @@
+package io.cloudevents.http.vertx;
+
+import io.cloudevents.http.vertx.impl.VertxHttpClientRequestMessageVisitorImpl;
+import io.cloudevents.message.BinaryMessageVisitor;
+import io.cloudevents.message.MessageVisitor;
+import io.vertx.core.http.HttpClientRequest;
+
+/**
+ * Visitor for {@link io.cloudevents.message.Message} that can write both structured and binary messages to a {@link HttpClientRequest}.
+ * When the visit ends, the request is ended with {@link HttpClientRequest#end(io.vertx.core.buffer.Buffer)}
+ */
+public interface VertxHttpClientRequestMessageVisitor extends MessageVisitor, BinaryMessageVisitor {
+
+ static VertxHttpClientRequestMessageVisitor create(HttpClientRequest req) {
+ return new VertxHttpClientRequestMessageVisitorImpl(req);
+ }
+
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxHttpServerResponseMessageVisitor.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxHttpServerResponseMessageVisitor.java
new file mode 100644
index 00000000..67d2e2f1
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxHttpServerResponseMessageVisitor.java
@@ -0,0 +1,18 @@
+package io.cloudevents.http.vertx;
+
+import io.cloudevents.http.vertx.impl.VertxHttpServerResponseMessageVisitorImpl;
+import io.cloudevents.message.BinaryMessageVisitor;
+import io.cloudevents.message.MessageVisitor;
+import io.vertx.core.http.HttpServerResponse;
+
+/**
+ * Visitor for {@link io.cloudevents.message.Message} that can write both structured and binary messages to a {@link HttpServerResponse}.
+ * When the visit ends, the request is ended with {@link HttpServerResponse#end(io.vertx.core.buffer.Buffer)}
+ */
+public interface VertxHttpServerResponseMessageVisitor extends MessageVisitor, BinaryMessageVisitor {
+
+ static VertxHttpServerResponseMessageVisitor create(HttpServerResponse res) {
+ return new VertxHttpServerResponseMessageVisitorImpl(res);
+ }
+
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessage.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessage.java
new file mode 100644
index 00000000..bddc28df
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessage.java
@@ -0,0 +1,109 @@
+package io.cloudevents.http.vertx;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.format.EventFormat;
+import io.cloudevents.format.EventFormatProvider;
+import io.cloudevents.http.vertx.impl.BinaryVertxMessageImpl;
+import io.cloudevents.http.vertx.impl.StructuredVertxMessageImpl;
+import io.cloudevents.message.Message;
+import io.cloudevents.message.impl.UnknownEncodingMessage;
+import io.vertx.core.*;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpServerRequest;
+
+/**
+ * Interface representing a Message implementation using Vert.x types
+ */
+public interface VertxMessage extends Message {
+
+ /**
+ * Create a new Message using Vert.x headers and body.
+ *
+ * @param headers Http headers
+ * @param body nullable buffer of the body
+ * @return a Message implementation with potentially an unknown encoding
+ * @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid
+ */
+ static Message create(MultiMap headers, Buffer body) throws IllegalArgumentException {
+ // Let's try structured mode
+ String ct = headers.get(HttpHeaders.CONTENT_TYPE);
+ if (ct != null) {
+ EventFormat format = EventFormatProvider.getInstance().resolveFormat(ct);
+ if (format != null) {
+ return new StructuredVertxMessageImpl(format, body);
+ }
+
+ }
+
+ // Let's try binary mode
+ String specVersionUnparsed = headers.get(BinaryVertxMessageImpl.CE_SPEC_VERSION_HEADER);
+ if (specVersionUnparsed != null) {
+ return new BinaryVertxMessageImpl(SpecVersion.parse(specVersionUnparsed), headers, body);
+ }
+
+ return new UnknownEncodingMessage();
+ }
+
+ /**
+ * Build a message starting from an {@link HttpServerRequest}
+ *
+ * @param request
+ * @return
+ */
+ static Future fromHttpServerRequest(HttpServerRequest request) {
+ Promise prom = Promise.promise();
+
+ request.exceptionHandler(prom::tryFail);
+ request.bodyHandler(b -> {
+ try {
+ prom.complete(create(request.headers(), b));
+ } catch (IllegalArgumentException e) {
+ prom.fail(e);
+ }
+ });
+ return prom.future();
+ }
+
+ /**
+ * Like {@link VertxMessage#fromHttpServerRequest(HttpServerRequest)}
+ *
+ * @param request
+ * @param handler
+ */
+ static void fromHttpServerRequest(HttpServerRequest request, Handler> handler) {
+ fromHttpServerRequest(request).onComplete(handler);
+ }
+
+ /**
+ * Build a message starting from an {@link HttpClientResponse}
+ *
+ * @param request
+ * @return
+ */
+ static Future fromHttpClientResponse(HttpClientResponse request) {
+ Promise prom = Promise.promise();
+
+ request.exceptionHandler(prom::tryFail);
+ request.bodyHandler(b -> {
+ try {
+ prom.complete(create(request.headers(), b));
+ } catch (IllegalArgumentException e) {
+ prom.fail(e);
+ }
+ });
+ return prom.future();
+ }
+
+ /**
+ * Like {@link VertxMessage#fromHttpClientResponse(HttpClientResponse)}
+ *
+ * @param response
+ * @param handler
+ */
+ static void fromHttpClientResponse(HttpClientResponse response, Handler> handler) {
+ fromHttpClientResponse(response).onComplete(handler);
+ }
+
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/BinaryVertxMessageImpl.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/BinaryVertxMessageImpl.java
new file mode 100644
index 00000000..cf083625
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/BinaryVertxMessageImpl.java
@@ -0,0 +1,88 @@
+package io.cloudevents.http.vertx.impl;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.http.vertx.VertxMessage;
+import io.cloudevents.message.*;
+import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class BinaryVertxMessageImpl implements VertxMessage {
+
+ public static final Map OPTIMIZED_ATTRIBUTES_TO_HEADERS = Stream.concat(
+ Stream.concat(SpecVersion.V1.getMandatoryAttributes().stream(), SpecVersion.V1.getOptionalAttributes().stream()),
+ Stream.concat(SpecVersion.V03.getMandatoryAttributes().stream(), SpecVersion.V03.getOptionalAttributes().stream())
+ )
+ .distinct()
+ .collect(Collectors.toMap(Function.identity(), v -> HttpHeaders.createOptimized("ce-" + v)));
+
+ public static final Map OPTIMIZED_HEADERS_TO_ATTRIBUTES = OPTIMIZED_ATTRIBUTES_TO_HEADERS
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+
+ public static final String CE_PREFIX = "ce-";
+
+ public static final CharSequence CE_SPEC_VERSION_HEADER = HttpHeaders.createOptimized("ce-specversion");
+
+ private final SpecVersion version;
+ private final MultiMap headers;
+ private final Buffer body;
+
+ public BinaryVertxMessageImpl(SpecVersion version, MultiMap headers, Buffer body) {
+ this.version = version;
+ this.headers = headers;
+ this.body = body;
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.BINARY;
+ }
+
+ @Override
+ public , V> V visit(BinaryMessageVisitorFactory visitorFactory) throws MessageVisitException, IllegalStateException {
+ BinaryMessageVisitor visitor = visitorFactory.createBinaryMessageVisitor(this.version);
+
+ // Grab from headers the attributes and extensions
+ this.headers.forEach(e -> {
+ try {
+ if (e.getKey().substring(0, 3).equalsIgnoreCase(CE_PREFIX)) {
+ String name = e.getKey().substring(3).toLowerCase();
+ if (name.equals("specversion")) {
+ return;
+ }
+ if (this.version.getAllAttributes().contains(name)) {
+ visitor.setAttribute(name, e.getValue());
+ } else {
+ visitor.setExtension(name, e.getValue());
+ }
+ }
+ } catch (StringIndexOutOfBoundsException ex) {
+ // String is smaller than 3 characters and it's not equal for sure to CE_PREFIX
+ }
+ });
+
+ String ct = this.headers.get(HttpHeaders.CONTENT_TYPE);
+ if (ct != null) {
+ visitor.setAttribute("datacontenttype", ct);
+ }
+
+ // Set the payload
+ if (this.body != null && this.body.length() != 0) {
+ visitor.setBody(this.body.getBytes());
+ }
+
+ return visitor.end();
+ }
+
+ @Override
+ public T visit(StructuredMessageVisitor visitor) throws MessageVisitException, IllegalStateException {
+ throw Encoding.WRONG_ENCODING_EXCEPTION;
+ }
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/CloudEventsHeaders.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/CloudEventsHeaders.java
new file mode 100644
index 00000000..d3371dcd
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/CloudEventsHeaders.java
@@ -0,0 +1,27 @@
+package io.cloudevents.http.vertx.impl;
+
+import io.cloudevents.SpecVersion;
+import io.vertx.core.http.HttpHeaders;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CloudEventsHeaders {
+
+ public static final Map ATTRIBUTES_TO_HEADERS = Stream.concat(
+ Stream.concat(SpecVersion.V1.getMandatoryAttributes().stream(), SpecVersion.V1.getOptionalAttributes().stream()),
+ Stream.concat(SpecVersion.V03.getMandatoryAttributes().stream(), SpecVersion.V03.getOptionalAttributes().stream())
+ )
+ .distinct()
+ .collect(Collectors.toMap(Function.identity(), v -> {
+ if (v.equals("datacontenttype")) {
+ return HttpHeaders.CONTENT_TYPE;
+ }
+ return HttpHeaders.createOptimized("ce-" + v);
+ }));
+
+ public static final CharSequence SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
+
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/StructuredVertxMessageImpl.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/StructuredVertxMessageImpl.java
new file mode 100644
index 00000000..b7432113
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/StructuredVertxMessageImpl.java
@@ -0,0 +1,32 @@
+package io.cloudevents.http.vertx.impl;
+
+import io.cloudevents.format.EventFormat;
+import io.cloudevents.http.vertx.VertxMessage;
+import io.cloudevents.message.*;
+import io.vertx.core.buffer.Buffer;
+
+public class StructuredVertxMessageImpl implements VertxMessage {
+
+ private final EventFormat format;
+ private final Buffer buffer;
+
+ public StructuredVertxMessageImpl(EventFormat format, Buffer buffer) {
+ this.format = format;
+ this.buffer = buffer;
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.STRUCTURED;
+ }
+
+ @Override
+ public , V> V visit(BinaryMessageVisitorFactory visitorFactory) throws MessageVisitException, IllegalStateException {
+ throw Encoding.WRONG_ENCODING_EXCEPTION;
+ }
+
+ @Override
+ public T visit(StructuredMessageVisitor visitor) throws MessageVisitException, IllegalStateException {
+ return visitor.setEvent(this.format, this.buffer.getBytes());
+ }
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java
deleted file mode 100644
index 89f49501..00000000
--- a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright 2019 The CloudEvents Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.cloudevents.http.vertx.impl;
-
-import static io.vertx.core.http.HttpHeaders.createOptimized;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-
-import io.cloudevents.CloudEvent;
-import io.cloudevents.format.Wire;
-import io.cloudevents.http.vertx.VertxCloudEvents;
-import io.cloudevents.json.Json;
-import io.cloudevents.v02.AttributesImpl;
-import io.cloudevents.v02.CloudEventImpl;
-import io.cloudevents.v02.http.Marshallers;
-import io.cloudevents.v02.http.Unmarshallers;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.MultiMap;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpClientRequest;
-import io.vertx.core.http.HttpHeaders;
-import io.vertx.core.http.HttpServerRequest;
-
-public final class VertxCloudEventsImpl implements VertxCloudEvents {
-
- private final static CharSequence BINARY_TYPE = HttpHeaders.createOptimized("application/json");
- private final static CharSequence STRUCTURED_TYPE = HttpHeaders.createOptimized("application/cloudevents+json");
-
- @Override
- public void readFromRequest(HttpServerRequest request, Handler>> resultHandler) {
- this.readFromRequest(request, null, resultHandler);
-
- }
-
- @Override
- public void readFromRequest(HttpServerRequest request, Class[] extensions, Handler>> resultHandler) {
-
- final MultiMap headers = request.headers();
-
- // binary mode
- if (headers.get(HttpHeaders.CONTENT_TYPE).equalsIgnoreCase(BINARY_TYPE.toString())) {
- request.bodyHandler((Buffer buff) -> {
- CloudEvent event =
- Unmarshallers.binary(String.class)
- .withHeaders(() -> {
- final Map result = new HashMap<>();
-
- headers.iterator()
- .forEachRemaining(header -> {
- result.put(header.getKey(), header.getValue());
- });
-
- return Collections.unmodifiableMap(result);
- })
- .withPayload(() -> {
- return buff.toString();
- })
- .unmarshal();
-
- resultHandler.handle(Future.succeededFuture(event));
- });
-
- } else if (headers.get(HttpHeaders.CONTENT_TYPE).equalsIgnoreCase(STRUCTURED_TYPE.toString())) {
- // structured read of the body
- request.bodyHandler((Buffer buff) -> {
-
- if (buff.length()>0) {
- resultHandler.handle(Future.succeededFuture(Json.decodeValue(buff.toString(),
- new TypeReference>() {})));
- } else {
- throw new IllegalArgumentException("no cloudevent body");
- }
- });
- } else {
- throw new IllegalArgumentException("no cloudevent type identified");
- }
- }
-
- @Override
- public void writeToHttpClientRequest(CloudEvent cloudEvent, HttpClientRequest request) {
- writeToHttpClientRequest(cloudEvent, Boolean.TRUE, request);
- }
-
- @Override
- public void writeToHttpClientRequest(CloudEvent cloudEvent, boolean binary, HttpClientRequest request) {
-
- if (binary) {
- Wire wire =
- Marshallers.binary()
- .withEvent(() -> cloudEvent)
- .marshal();
-
- // setting the right content-length:
- request.putHeader(HttpHeaders.CONTENT_LENGTH, createOptimized("0"));
- wire.getPayload().ifPresent((payload) -> {
- request.putHeader(HttpHeaders.CONTENT_LENGTH,
- createOptimized(String.valueOf(payload.length())));
- });
-
- // read required headers
- wire.getHeaders().entrySet()
- .stream()
- .forEach(header -> {
- request.putHeader(createOptimized(header.getKey()),
- createOptimized(header.getValue()));
- });
-
- wire.getPayload().ifPresent((payload) -> {
- request.write(payload);
- });
- } else {
- // read required headers
- request.putHeader(HttpHeaders.CONTENT_TYPE, STRUCTURED_TYPE);
- final String json = Json.encode(cloudEvent);
- request.putHeader(HttpHeaders.CONTENT_LENGTH,
- createOptimized(String.valueOf(json.length())));
- // this the body
- request.write(json);
- }
- }
-}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxHttpClientRequestMessageVisitorImpl.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxHttpClientRequestMessageVisitorImpl.java
new file mode 100644
index 00000000..edc58fdd
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxHttpClientRequestMessageVisitorImpl.java
@@ -0,0 +1,66 @@
+package io.cloudevents.http.vertx.impl;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.format.EventFormat;
+import io.cloudevents.http.vertx.VertxHttpClientRequestMessageVisitor;
+import io.cloudevents.message.MessageVisitException;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientRequest;
+import io.vertx.core.http.HttpHeaders;
+
+public class VertxHttpClientRequestMessageVisitorImpl implements VertxHttpClientRequestMessageVisitor {
+
+ private final HttpClientRequest request;
+ private boolean ended;
+
+ public VertxHttpClientRequestMessageVisitorImpl(HttpClientRequest request) {
+ this.request = request;
+ this.ended = false;
+ }
+
+ // Binary visitor factory
+
+ @Override
+ public VertxHttpClientRequestMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
+ this.request.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
+ return this;
+ }
+
+ // Binary visitor
+
+ @Override
+ public void setAttribute(String name, String value) throws MessageVisitException {
+ this.request.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
+ }
+
+ @Override
+ public void setExtension(String name, String value) throws MessageVisitException {
+ this.request.putHeader("ce-" + name, value);
+ }
+
+ @Override
+ public void setBody(byte[] value) throws MessageVisitException {
+ if (ended) {
+ throw MessageVisitException.newOther(new IllegalStateException("Cannot set the body because the request is already ended"));
+ }
+ this.request.end(Buffer.buffer(value));
+ this.ended = true;
+ }
+
+ @Override
+ public HttpClientRequest end() {
+ if (!ended) {
+ this.request.end();
+ }
+ return this.request;
+ }
+
+ // Structured visitor
+
+ @Override
+ public HttpClientRequest setEvent(EventFormat format, byte[] value) throws MessageVisitException {
+ this.request.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
+ this.request.end(Buffer.buffer(value));
+ return this.request;
+ }
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxHttpServerResponseMessageVisitorImpl.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxHttpServerResponseMessageVisitorImpl.java
new file mode 100644
index 00000000..bb937fee
--- /dev/null
+++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxHttpServerResponseMessageVisitorImpl.java
@@ -0,0 +1,63 @@
+package io.cloudevents.http.vertx.impl;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.format.EventFormat;
+import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
+import io.cloudevents.message.MessageVisitException;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpServerResponse;
+
+public class VertxHttpServerResponseMessageVisitorImpl implements VertxHttpServerResponseMessageVisitor {
+
+ private final HttpServerResponse response;
+
+ public VertxHttpServerResponseMessageVisitorImpl(HttpServerResponse response) {
+ this.response = response;
+ }
+
+ // Binary visitor factory
+
+ @Override
+ public VertxHttpServerResponseMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
+ this.response.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
+ return this;
+ }
+
+ // Binary visitor
+
+ @Override
+ public void setAttribute(String name, String value) throws MessageVisitException {
+ this.response.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
+ }
+
+ @Override
+ public void setExtension(String name, String value) throws MessageVisitException {
+ this.response.putHeader("ce-" + name, value);
+ }
+
+ @Override
+ public void setBody(byte[] value) throws MessageVisitException {
+ if (this.response.ended()) {
+ throw MessageVisitException.newOther(new IllegalStateException("Cannot set the body because the response is already ended"));
+ }
+ this.response.end(Buffer.buffer(value));
+ }
+
+ @Override
+ public HttpServerResponse end() {
+ if (!this.response.ended()) {
+ this.response.end();
+ }
+ return this.response;
+ }
+
+ // Structured visitor
+
+ @Override
+ public HttpServerResponse setEvent(EventFormat format, byte[] value) throws MessageVisitException {
+ this.response.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
+ this.response.end(Buffer.buffer(value));
+ return this.response;
+ }
+}
diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/package-info.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/package-info.java
deleted file mode 100644
index 61025c9a..00000000
--- a/http/vertx/src/main/java/io/cloudevents/http/vertx/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright 2018 The CloudEvents Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@ModuleGen(groupPackage = "io.cloudevents.http", name = "vertx")
-package io.cloudevents.http.vertx;
-
-import io.vertx.codegen.annotations.ModuleGen;
\ No newline at end of file
diff --git a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java
deleted file mode 100644
index 1fe8729a..00000000
--- a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Copyright 2018 The CloudEvents Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.cloudevents.http.vertx;
-
-import io.cloudevents.CloudEvent;
-import io.cloudevents.extensions.DistributedTracingExtension;
-import io.cloudevents.extensions.ExtensionFormat;
-import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
-import io.cloudevents.v02.AttributesImpl;
-import io.cloudevents.v02.CloudEventBuilder;
-import io.vertx.core.http.HttpHeaders;
-import io.vertx.junit5.Checkpoint;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import io.vertx.reactivex.core.Vertx;
-import io.vertx.reactivex.core.http.HttpClientRequest;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.URI;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-@ExtendWith(VertxExtension.class)
-class VertxCloudEventsTests {
-
- private int port;
- @BeforeEach
- void setUp(Vertx vertx, VertxTestContext testContext) throws IOException {
- ServerSocket socket = new ServerSocket(0);
- port = socket.getLocalPort();
- socket.close();
- testContext.completeNow();
- }
-
- @Test
- @DisplayName("Post a 0.2 CloudEvents object with a payload")
- void cloudEventWithPayload(Vertx vertx, VertxTestContext testContext) {
- Checkpoint serverCheckpoint = testContext.checkpoint();
- Checkpoint clientCheckpoint = testContext.checkpoint();
-
- // given
- final CloudEvent cloudEvent =
- CloudEventBuilder.builder()
- .withSource(URI.create("http://knative-eventing.com"))
- .withId("foo-bar")
- .withType("pushevent")
- .withData("{\"foo\":\"bar\"}}")
- .withContenttype("application/json")
- .build();
-
- vertx.createHttpServer()
- .requestHandler(req -> VertxCloudEvents
- .create()
- // read the object from the server request
- .rxReadFromRequest(req)
- .doOnError(testContext::failNow)
- .subscribe(event -> testContext.verify(() -> {
-
- // test
- assertThat(event.getAttributes().getId()).isEqualTo(cloudEvent.getAttributes().getId());
- assertThat(event.getAttributes().getSource().toString()).isEqualTo(cloudEvent.getAttributes().getSource().toString());
- assertThat(event.getAttributes().getType()).isEqualTo(cloudEvent.getAttributes().getType());
- assertThat(event.getData()).isPresent();
-
- // write the response back to the caller
- req.response().end();
- serverCheckpoint.flag();
- })))
- .rxListen(port)
- .doOnError(testContext::failNow)
- .subscribe(server -> {
- // create client to POST a CloudEvent to the server
- final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
- req.handler(resp -> testContext.verify(() -> {
- assertThat(resp.statusCode()).isEqualTo(200);
- clientCheckpoint.flag();
- }));
- VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
- req.end();
- });
- }
-
- @Test
- @DisplayName("Post a 0.2 CloudEvents object without a payload")
- void cloudEventWithoutPayload(Vertx vertx, VertxTestContext testContext) {
- Checkpoint serverCheckpoint = testContext.checkpoint();
- Checkpoint clientCheckpoint = testContext.checkpoint();
-
- // given
- final CloudEvent cloudEvent =
- CloudEventBuilder.builder()
- .withSource(URI.create("http://knative-eventing.com"))
- .withId("foo-bar")
- .withType("pushevent")
- .withContenttype("application/json")
- .build();
-
- vertx.createHttpServer()
- .requestHandler(req -> VertxCloudEvents
- .create()
- // read the object from the server request
- .rxReadFromRequest(req)
- .doOnError(testContext::failNow)
- .subscribe(event -> testContext.verify(() -> {
-
- // check headers
- assertThat(req.headers().get("ce-specversion")).isEqualTo("0.2");
- assertThat(req.headers().get("cloudEventsVersion")).isNull();
- assertThat(req.headers().get("ce-id")).isEqualTo("foo-bar");
- assertThat(req.headers().get("ce-eventID")).isNull();
-
- // check parsed object
- assertThat(event.getAttributes().getId()).isEqualTo(cloudEvent.getAttributes().getId());
- assertThat(event.getAttributes().getSource().toString()).isEqualTo(cloudEvent.getAttributes().getSource().toString());
- assertThat(event.getAttributes().getType()).isEqualTo(cloudEvent.getAttributes().getType());
- assertThat(event.getData()).isNotPresent();
-
- // write the response back to the caller
- req.response().end();
- serverCheckpoint.flag();
- })))
- .rxListen(port)
- .doOnError(testContext::failNow)
- .subscribe(server -> {
- // create client to POST a CloudEvent to the server
- final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
- req.handler(resp -> testContext.verify(() -> {
- assertThat(resp.statusCode()).isEqualTo(200);
- clientCheckpoint.flag();
- }));
- VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
- req.end();
- });
- }
-
- @Test
- @DisplayName("Post an incomplete cloud event")
- void incompleteCloudEvent(Vertx vertx, VertxTestContext testContext) {
- Checkpoint serverCheckpoint = testContext.checkpoint();
- Checkpoint clientCheckpoint = testContext.checkpoint();
-
- vertx.createHttpServer()
- .requestHandler(req -> VertxCloudEvents
- .create()
- // read the object from the server request
- .rxReadFromRequest(req)
- .subscribe((e, t) -> {
- if (e != null) {
- testContext.failNow(new AssertionError("request was not complete, but got: " + e));
- } else {
- req.response().end();
- serverCheckpoint.flag();
- }
- }))
- .rxListen(port)
- .doOnError(testContext::failNow)
- .subscribe(server -> {
-
- // create client to POST a CloudEvent to the server
- final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
- // create incomplete CloudEvent request
- req.putHeader(HttpHeaders.createOptimized("ce-specversion"), HttpHeaders.createOptimized("0.2"));
- req.putHeader(HttpHeaders.createOptimized("ce-type"), HttpHeaders.createOptimized("pushevent"));
- req.putHeader(HttpHeaders.createOptimized("foo"), HttpHeaders.createOptimized("bar"));
- req.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
- req.handler(resp -> testContext.verify(() -> {
- assertThat(resp.statusCode()).isEqualTo(200);
- clientCheckpoint.flag();
- }));
- req.end();
- });
- }
-
- @Test
- @DisplayName("Post a 0.2 CloudEvents object with a payload")
- void cloudEventWithExtension(Vertx vertx, VertxTestContext testContext) {
- Checkpoint serverCheckpoint = testContext.checkpoint();
- Checkpoint clientCheckpoint = testContext.checkpoint();
-
- final DistributedTracingExtension dte = new DistributedTracingExtension();
- dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
- dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
-
- final ExtensionFormat tracing = new DistributedTracingExtension.Format(dte);
-
- final CloudEvent cloudEvent =
- CloudEventBuilder.builder()
- .withSource(URI.create("http://knative-eventing.com"))
- .withId("foo-bar")
- .withExtension(tracing)
- .withType("pushevent")
- .withContenttype("application/json")
- .build();
-
- final Class[] extensions = {DistributedTracingExtension.class};
-
- vertx.createHttpServer()
- .requestHandler(req -> VertxCloudEvents
- .create()
- // read from request with expected Extension types
- .rxReadFromRequest(req, extensions)
- .doOnError(testContext::failNow)
- .subscribe(event -> testContext.verify(() -> {
- assertThat(event.getAttributes().getId()).isEqualTo(cloudEvent.getAttributes().getId());
-
- // test
- System.out.println(">>>>>>>>>>>>" + req.headers());
- assertThat(req.headers().get("traceparent")).isEqualTo("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
- assertThat(req.headers().get("tracestate")).isEqualTo("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
- assertThat(event.getExtensions().get("distributedTracing")).isNotNull();
- assertThat(event.getExtensions().get("distributedTracing")).extracting("traceparent", "tracestate")
- .contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
-
- // write the response back to the caller
- req.response().end();
- serverCheckpoint.flag();
- })))
- .rxListen(port)
- .doOnError(testContext::failNow)
- .subscribe(server -> {
- // create client to POST a CloudEvent to the server
- final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
- req.handler(resp -> testContext.verify(() -> {
- assertThat(resp.statusCode()).isEqualTo(200);
- clientCheckpoint.flag();
- }));
- VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
- req.end();
- });
- }
-
- @Test
- @DisplayName("Post a 0.2 CloudEvents object without a payload")
- void structuredCloudEvent(Vertx vertx, VertxTestContext testContext) {
- Checkpoint serverCheckpoint = testContext.checkpoint();
- Checkpoint clientCheckpoint = testContext.checkpoint();
-
- // given
- final CloudEvent cloudEvent =
- CloudEventBuilder.builder()
- .withSource(URI.create("http://knative-eventing.com"))
- .withId("foo-bar")
- .withType("pushevent")
- .build();
-
- vertx.createHttpServer()
- .requestHandler(req -> VertxCloudEvents
- .create()
- // read the object from the server request
- .rxReadFromRequest(req)
- .doOnError(testContext::failNow)
- .subscribe(event -> testContext.verify(() -> {
- // write the response back to the caller
- req.response().end();
- serverCheckpoint.flag();
- })))
- .rxListen(port)
- .doOnError(testContext::failNow)
- .subscribe(server -> {
- // create client to POST a CloudEvent to the server
- final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
- req.handler(resp -> testContext.verify(() -> {
- assertThat(resp.statusCode()).isEqualTo(200);
- clientCheckpoint.flag();
- }));
- VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, Boolean.FALSE,req);
- req.end();
- });
- }
-}
diff --git a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxHttpClientRequestMessageVisitorTest.java b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxHttpClientRequestMessageVisitorTest.java
new file mode 100644
index 00000000..a8f7c4b7
--- /dev/null
+++ b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxHttpClientRequestMessageVisitorTest.java
@@ -0,0 +1,242 @@
+package io.cloudevents.http.vertx;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.mock.CSVFormat;
+import io.cloudevents.types.Time;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientRequest;
+import io.vertx.junit5.Checkpoint;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static io.cloudevents.test.Data.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+public class VertxHttpClientRequestMessageVisitorTest {
+
+ @ParameterizedTest
+ @MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
+ void testRequestWithStructured(CloudEvent event, Vertx vertx, VertxTestContext testContext) {
+ String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
+ byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);
+
+ Checkpoint checkpoint = testContext.checkpoint(3);
+
+ vertx
+ .createHttpServer()
+ .requestHandler(httpServerRequest -> {
+ httpServerRequest.bodyHandler(buf -> {
+ testContext.verify(() -> {
+ assertThat(httpServerRequest.getHeader("content-type"))
+ .isEqualTo(expectedContentType);
+ assertThat(buf.getBytes())
+ .isEqualTo(expectedBuffer);
+ });
+ checkpoint.flag();
+ });
+ httpServerRequest.response().end();
+ })
+ .listen(9000, testContext.succeeding(server -> {
+ HttpClient client = vertx.createHttpClient();
+ HttpClientRequest req = client.get(server.actualPort(), "localhost", "/", httpClientResponse -> {
+ testContext.verify(() -> {
+ assertThat(httpClientResponse.statusCode())
+ .isEqualTo(200);
+ });
+ checkpoint.flag();
+ });
+ try {
+ event.asStructuredMessage(CSVFormat.INSTANCE)
+ .visit(VertxHttpClientRequestMessageVisitor.create(req));
+ } catch (Throwable e) {
+ testContext.failNow(e);
+ }
+ checkpoint.flag();
+ }));
+ }
+
+ @ParameterizedTest
+ @MethodSource("binaryTestArguments")
+ void testRequestWithBinary(CloudEvent event, MultiMap headers, Buffer body, Vertx vertx, VertxTestContext testContext) {
+ Checkpoint checkpoint = testContext.checkpoint(3);
+
+ vertx
+ .createHttpServer()
+ .requestHandler(httpServerRequest -> {
+ httpServerRequest.bodyHandler(buf -> {
+ testContext.verify(() -> {
+ headers.forEach(e -> {
+ assertThat(httpServerRequest.getHeader(e.getKey()))
+ .isEqualTo(e.getValue());
+ });
+ if (body != null) {
+ assertThat(buf.getBytes())
+ .isEqualTo(body.getBytes());
+ }
+ });
+ checkpoint.flag();
+ });
+ httpServerRequest.response().end();
+ })
+ .listen(9000, testContext.succeeding(server -> {
+ HttpClient client = vertx.createHttpClient();
+ HttpClientRequest req = client.get(server.actualPort(), "localhost", "/", httpClientResponse -> {
+ testContext.verify(() -> {
+ assertThat(httpClientResponse.statusCode())
+ .isEqualTo(200);
+ });
+ checkpoint.flag();
+ });
+ try {
+ event.asBinaryMessage()
+ .visit(VertxHttpClientRequestMessageVisitor.create(req));
+ } catch (Throwable e) {
+ testContext.failNow(e);
+ }
+ checkpoint.flag();
+ }));
+ }
+
+ public static Stream binaryTestArguments() {
+ return Stream.of(
+ // V03
+ Arguments.of(
+ V03_MIN,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString()),
+ null
+ ),
+ Arguments.of(
+ V03_WITH_JSON_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-schemaurl", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V03_WITH_JSON_DATA_WITH_EXT_STRING,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-schemaurl", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ce-astring", "aaa")
+ .add("ce-aboolean", "true")
+ .add("ce-anumber", "10"),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V03_WITH_XML_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_XML)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_XML_SERIALIZED)
+ ),
+ Arguments.of(
+ V03_WITH_TEXT_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_TEXT)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_TEXT_SERIALIZED)
+ ),
+ // V1
+ Arguments.of(
+ V1_MIN,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString()),
+ null
+ ),
+ Arguments.of(
+ V1_WITH_JSON_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-dataschema", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V1_WITH_JSON_DATA_WITH_EXT_STRING,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-dataschema", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ce-astring", "aaa")
+ .add("ce-aboolean", "true")
+ .add("ce-anumber", "10"),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V1_WITH_XML_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_XML)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_XML_SERIALIZED)
+ ),
+ Arguments.of(
+ V1_WITH_TEXT_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_TEXT)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_TEXT_SERIALIZED)
+ )
+ );
+ }
+
+}
diff --git a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxHttpServerResponseMessageVisitorTest.java b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxHttpServerResponseMessageVisitorTest.java
new file mode 100644
index 00000000..60bd6030
--- /dev/null
+++ b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxHttpServerResponseMessageVisitorTest.java
@@ -0,0 +1,238 @@
+package io.cloudevents.http.vertx;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.mock.CSVFormat;
+import io.cloudevents.types.Time;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClient;
+import io.vertx.junit5.Checkpoint;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static io.cloudevents.test.Data.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+public class VertxHttpServerResponseMessageVisitorTest {
+
+ @ParameterizedTest
+ @MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
+ void testReplyWithStructured(CloudEvent event, Vertx vertx, VertxTestContext testContext) {
+ Checkpoint checkpoint = testContext.checkpoint(2);
+
+ vertx
+ .createHttpServer()
+ .requestHandler(httpServerRequest -> {
+ try {
+ event.asStructuredMessage(CSVFormat.INSTANCE).visit(
+ VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response())
+ );
+ checkpoint.flag();
+ } catch (Throwable e) {
+ testContext.failNow(e);
+ }
+ })
+ .listen(9000, testContext.succeeding(server -> {
+ HttpClient client = vertx.createHttpClient();
+ client
+ .get(server.actualPort(), "localhost", "/")
+ .handler(res -> {
+ res.bodyHandler(buf -> {
+ testContext.verify(() -> {
+ assertThat(res.statusCode())
+ .isEqualTo(200);
+ assertThat(res.getHeader("content-type"))
+ .isEqualTo(CSVFormat.INSTANCE.serializedContentType());
+ assertThat(buf.getBytes())
+ .isEqualTo(CSVFormat.INSTANCE.serialize(event));
+
+ checkpoint.flag();
+ });
+ });
+ })
+ .end();
+ }));
+ }
+
+ @ParameterizedTest
+ @MethodSource("binaryTestArguments")
+ void testReplyWithBinary(CloudEvent event, MultiMap headers, Buffer body, Vertx vertx, VertxTestContext testContext) {
+ Checkpoint checkpoint = testContext.checkpoint(2);
+
+ vertx
+ .createHttpServer()
+ .requestHandler(httpServerRequest -> {
+ try {
+ event.asBinaryMessage().visit(
+ VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response())
+ );
+ checkpoint.flag();
+ } catch (Throwable e) {
+ testContext.failNow(e);
+ }
+ })
+ .listen(9000, testContext.succeeding(server -> {
+ HttpClient client = vertx.createHttpClient();
+ client
+ .get(server.actualPort(), "localhost", "/")
+ .handler(res -> {
+ res.bodyHandler(buf -> {
+ testContext.verify(() -> {
+ assertThat(res.statusCode())
+ .isEqualTo(200);
+ headers.forEach(e -> {
+ assertThat(res.getHeader(e.getKey())).isEqualTo(e.getValue());
+ });
+ if (body != null) {
+ assertThat(buf.getBytes())
+ .isEqualTo(body.getBytes());
+ }
+ });
+ checkpoint.flag();
+ });
+ })
+ .end();
+ }));
+ }
+
+ public static Stream binaryTestArguments() {
+ return Stream.of(
+ // V03
+ Arguments.of(
+ V03_MIN,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString()),
+ null
+ ),
+ Arguments.of(
+ V03_WITH_JSON_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-schemaurl", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V03_WITH_JSON_DATA_WITH_EXT_STRING,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-schemaurl", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ce-astring", "aaa")
+ .add("ce-aboolean", "true")
+ .add("ce-anumber", "10"),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V03_WITH_XML_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_XML)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_XML_SERIALIZED)
+ ),
+ Arguments.of(
+ V03_WITH_TEXT_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_TEXT)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_TEXT_SERIALIZED)
+ ),
+ // V1
+ Arguments.of(
+ V1_MIN,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString()),
+ null
+ ),
+ Arguments.of(
+ V1_WITH_JSON_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-dataschema", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V1_WITH_JSON_DATA_WITH_EXT_STRING,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-dataschema", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ce-astring", "aaa")
+ .add("ce-aboolean", "true")
+ .add("ce-anumber", "10"),
+ Buffer.buffer(DATA_JSON_SERIALIZED)
+ ),
+ Arguments.of(
+ V1_WITH_XML_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_XML)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_XML_SERIALIZED)
+ ),
+ Arguments.of(
+ V1_WITH_TEXT_DATA,
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_TEXT)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
+ Buffer.buffer(DATA_TEXT_SERIALIZED)
+ )
+ );
+ }
+
+}
diff --git a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxMessageTest.java b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxMessageTest.java
new file mode 100644
index 00000000..7d1c9716
--- /dev/null
+++ b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxMessageTest.java
@@ -0,0 +1,202 @@
+package io.cloudevents.http.vertx;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.message.Encoding;
+import io.cloudevents.message.Message;
+import io.cloudevents.mock.CSVFormat;
+import io.cloudevents.types.Time;
+import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static io.cloudevents.test.Data.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class VertxMessageTest {
+
+ @ParameterizedTest()
+ @MethodSource("binaryTestArguments")
+ public void readBinary(MultiMap headers, Buffer body, CloudEvent event) {
+ Message message = VertxMessage.create(headers, body);
+
+ assertThat(message.getEncoding())
+ .isEqualTo(Encoding.BINARY);
+ assertThat(message.toEvent())
+ .isEqualTo(event);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
+ public void readStructured(CloudEvent event) {
+ byte[] serializedEvent = CSVFormat.INSTANCE.serialize(event);
+
+ Message message = VertxMessage.create(
+ MultiMap.caseInsensitiveMultiMap().add("content-type", CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"),
+ Buffer.buffer(serializedEvent)
+ );
+
+ assertThat(message.getEncoding())
+ .isEqualTo(Encoding.STRUCTURED);
+ assertThat(message.toEvent())
+ .isEqualTo(event);
+ }
+
+ public static Stream binaryTestArguments() {
+ return Stream.of(
+ // V03
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ignored", "ignored"),
+ null,
+ V03_MIN
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-schemaurl", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_JSON_SERIALIZED),
+ V03_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-schemaurl", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ce-astring", "aaa")
+ .add("ce-aboolean", "true")
+ .add("ce-anumber", "10")
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_JSON_SERIALIZED),
+ V03_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_XML)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_XML_SERIALIZED),
+ V03_WITH_XML_DATA
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V03.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_TEXT)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_TEXT_SERIALIZED),
+ V03_WITH_TEXT_DATA
+ ),
+ // V1
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ignored", "ignored"),
+ null,
+ V1_MIN
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-dataschema", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_JSON_SERIALIZED),
+ V1_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ce-dataschema", DATASCHEMA.toString())
+ .add("content-type", DATACONTENTTYPE_JSON)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ce-astring", "aaa")
+ .add("ce-aboolean", "true")
+ .add("ce-anumber", "10")
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_JSON_SERIALIZED),
+ V1_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_XML)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_XML_SERIALIZED),
+ V1_WITH_XML_DATA
+ ),
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("ce-specversion", SpecVersion.V1.toString())
+ .add("ce-id", ID)
+ .add("ce-type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("content-type", DATACONTENTTYPE_TEXT)
+ .add("ce-subject", SUBJECT)
+ .add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
+ .add("ignored", "ignored"),
+ Buffer.buffer(DATA_TEXT_SERIALIZED),
+ V1_WITH_TEXT_DATA
+ ),
+ // Headers case insensitivity
+ Arguments.of(
+ MultiMap.caseInsensitiveMultiMap()
+ .add("Ce-sPecversion", SpecVersion.V03.toString())
+ .add("cE-id", ID)
+ .add("CE-Type", TYPE)
+ .add("ce-source", SOURCE.toString())
+ .add("ignored", "ignored")
+ .add("ab", "should-not-break-anything"),
+ null,
+ V03_MIN
+ )
+ );
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 45bffaf3..c170e70c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,9 +63,9 @@
api
formats/json-jackson
+ http/vertx
@@ -74,6 +74,9 @@
1.8
1.8
UTF-8
+ 3.11.0
+ 5.5.2
+ 3.0.0-M4