Reworked Vert.x Integration (#129)

* Started http-vertx module reworking

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Vertx integration done

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Adjusted the READMEs

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Case sensitivity fix

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2020-04-24 12:17:49 +02:00 committed by GitHub
parent 4e38fc805d
commit 2b5e72c50a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1396 additions and 613 deletions

View File

@ -17,10 +17,21 @@ If you want to know more about v1 of this SDK, check out the [v1 readme](./READM
Stay tuned!
This SDK current supports the following versions of CloudEvents:
Supported features of the specification:
- 0.3
- 1.0
| | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| :---------------------------: | :----------------------------------------------------------------------------: | :---------------------------------------------------------------------------------: |
| CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: |
| AMQP Protocol Binding | :x: | :x: |
| AVRO Event Format | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| - [Vert.x](http/vertx) | :heavy_check_mark: | :heavy_check_mark: |
| JSON Event Format | :heavy_check_mark: | :heavy_check_mark: |
| - [Jackson](formats/json-jackson) | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :x: |
| MQTT Protocol Binding | :x: | :x: |
| NATS Protocol Binding | :x: | :x: |
| Web hook | :x: | :x: |
## Motivation

View File

@ -34,14 +34,14 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.4.2</version>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.10.0</version>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
@ -52,7 +52,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<version>${maven-surefire-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -4,6 +4,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public enum SpecVersion {
V03(
@ -20,11 +22,15 @@ public enum SpecVersion {
private final String stringValue;
private final Set<String> mandatoryAttributes;
private final Set<String> optionalAttributes;
private final Set<String> allAttributes;
SpecVersion(String stringValue, Set<String> mandatoryAttributes, Set<String> optionalAttributes) {
this.stringValue = stringValue;
this.mandatoryAttributes = mandatoryAttributes;
this.optionalAttributes = optionalAttributes;
this.allAttributes = Collections.unmodifiableSet(
Stream.concat(mandatoryAttributes.stream(), optionalAttributes.stream()).collect(Collectors.toSet())
);
}
@Override
@ -43,11 +49,24 @@ public enum SpecVersion {
}
}
/**
* @return mandatory attributes of the spec version
*/
public Set<String> getMandatoryAttributes() {
return mandatoryAttributes;
}
/**
* @return optional attributes of the spec version
*/
public Set<String> getOptionalAttributes() {
return optionalAttributes;
}
/**
* @return all attributes for this spec
*/
public Set<String> getAllAttributes() {
return allAttributes;
}
}

View File

@ -2,6 +2,7 @@ package io.cloudevents.format;
import io.cloudevents.CloudEvent;
import java.util.Collections;
import java.util.Set;
public interface EventFormat {
@ -10,6 +11,10 @@ public interface EventFormat {
CloudEvent deserialize(byte[] event) throws EventDeserializationException;
Set<String> supportedContentTypes();
default Set<String> deserializableContentTypes() {
return Collections.singleton(serializedContentType());
}
String serializedContentType();
}

View File

@ -26,12 +26,16 @@ public final class EventFormatProvider {
}
public void registerFormat(EventFormat format) {
for (String k: format.supportedContentTypes()) {
for (String k : format.deserializableContentTypes()) {
this.formats.put(k, format);
}
}
public EventFormat resolveFormat(String key) {
int i = key.indexOf(';');
if (i != -1) {
key = key.substring(0, i);
}
return this.formats.get(key);
}

View File

@ -120,7 +120,7 @@ public final class CloudEventImpl implements CloudEvent, BinaryMessage, BinaryMe
public String toString() {
return "CloudEvent{" +
"attributes=" + attributes +
", data=" + new String(this.data, StandardCharsets.UTF_8) +
((this.data != null) ? ", data=" + new String(this.data, StandardCharsets.UTF_8) : "") +
", extensions=" + extensions +
'}';
}

View File

@ -6,12 +6,11 @@ import io.cloudevents.CloudEvent;
public interface BinaryMessage {
/**
*
* @param visitor
* @param visitorFactory
* @throws MessageVisitException
* @throws IllegalStateException If the message is not a valid binary message
*/
<T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitor) throws MessageVisitException, IllegalStateException;
<V extends BinaryMessageVisitor<R>, R> R visit(BinaryMessageVisitorFactory<V, R> visitorFactory) throws MessageVisitException, IllegalStateException;
default CloudEvent toEvent() throws MessageVisitException, IllegalStateException {
return this.visit(specVersion -> {

View File

@ -8,12 +8,34 @@ import java.time.ZonedDateTime;
@FunctionalInterface
public interface BinaryMessageAttributesVisitor {
/**
* Set attribute with type {@link String}. This setter should not be invoked for specversion, because the built Visitor already
* has the information through the {@link BinaryMessageVisitorFactory}
*
* @param name
* @param value
* @throws MessageVisitException
*/
void setAttribute(String name, String value) throws MessageVisitException;
/**
* Set attribute with type {@link URI}.
*
* @param name
* @param value
* @throws MessageVisitException
*/
default void setAttribute(String name, URI value) throws MessageVisitException {
setAttribute(name, value.toString());
}
/**
* Set attribute with type {@link ZonedDateTime} attribute.
*
* @param name
* @param value
* @throws MessageVisitException
*/
default void setAttribute(String name, ZonedDateTime value) throws MessageVisitException {
setAttribute(name, value.format(Time.RFC3339_DATE_FORMAT));
}

View File

@ -3,6 +3,6 @@ package io.cloudevents.message;
import io.cloudevents.SpecVersion;
@FunctionalInterface
public interface BinaryMessageVisitorFactory<T extends BinaryMessageVisitor<V>, V> {
T createBinaryMessageVisitor(SpecVersion version);
public interface BinaryMessageVisitorFactory<V extends BinaryMessageVisitor<R>, R> {
V createBinaryMessageVisitor(SpecVersion version);
}

View File

@ -8,10 +8,16 @@ public class MessageVisitException extends RuntimeException {
INVALID_ATTRIBUTE_TYPE,
INVALID_ATTRIBUTE_VALUE,
INVALID_EXTENSION_TYPE,
OTHER
}
private MessageVisitExceptionKind kind;
public MessageVisitException(MessageVisitExceptionKind kind, Throwable cause) {
super(cause);
this.kind = kind;
}
public MessageVisitException(MessageVisitExceptionKind kind, String message) {
super(message);
this.kind = kind;
@ -61,4 +67,11 @@ public class MessageVisitException extends RuntimeException {
"Invalid extension type for \"" + extensionName + "\": " + clazz.getCanonicalName()
);
}
public static MessageVisitException newOther(Throwable cause) {
return new MessageVisitException(
MessageVisitExceptionKind.OTHER,
cause
);
}
}

View File

@ -0,0 +1,20 @@
package io.cloudevents.message.impl;
import io.cloudevents.message.*;
public class UnknownEncodingMessage implements Message {
@Override
public Encoding getEncoding() {
return Encoding.UNKNOWN;
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
}

View File

@ -9,7 +9,13 @@ public class EventFormatProviderTest {
@Test
void resolveCSV() {
assertThat(EventFormatProvider.getInstance().resolveFormat("text/csv"))
assertThat(EventFormatProvider.getInstance().resolveFormat(CSVFormat.INSTANCE.serializedContentType()))
.isInstanceOf(CSVFormat.class);
}
@Test
void resolveCSVWithParams() {
assertThat(EventFormatProvider.getInstance().resolveFormat(CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"))
.isInstanceOf(CSVFormat.class);
}

View File

@ -69,14 +69,21 @@ public class CSVFormat implements EventFormat {
builder.withData(data);
}
switch (sv) {
case V03: return builder.build().toV03();
case V1: return builder.build().toV1();
case V03:
return builder.build().toV03();
case V1:
return builder.build().toV1();
}
return null;
}
@Override
public Set<String> supportedContentTypes() {
return Collections.singleton("text/csv");
public Set<String> deserializableContentTypes() {
return Collections.singleton(serializedContentType());
}
@Override
public String serializedContentType() {
return "application/cloudevents+csv";
}
}

View File

@ -14,7 +14,7 @@ public class MockStructuredMessage implements Message, StructuredMessageVisitor<
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitor) throws MessageVisitException, IllegalStateException {
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}

View File

@ -50,6 +50,18 @@ public class Data {
.withExtension("anumber", 10)
.build();
public static final CloudEvent V1_WITH_JSON_DATA_WITH_EXT_STRING = CloudEvent.buildV1()
.withId(ID)
.withType(TYPE)
.withSource(SOURCE)
.withData(DATACONTENTTYPE_JSON, DATASCHEMA, DATA_JSON_SERIALIZED)
.withSubject(SUBJECT)
.withTime(TIME)
.withExtension("astring", "aaa")
.withExtension("aboolean", "true")
.withExtension("anumber", "10")
.build();
public static final CloudEvent V1_WITH_XML_DATA = CloudEvent.buildV1()
.withId(ID)
.withType(TYPE)
@ -71,6 +83,7 @@ public class Data {
public static final CloudEvent V03_MIN = V1_MIN.toV03();
public static final CloudEvent V03_WITH_JSON_DATA = V1_WITH_JSON_DATA.toV03();
public static final CloudEvent V03_WITH_JSON_DATA_WITH_EXT = V1_WITH_JSON_DATA_WITH_EXT.toV03();
public static final CloudEvent V03_WITH_JSON_DATA_WITH_EXT_STRING = V1_WITH_JSON_DATA_WITH_EXT_STRING.toV03();
public static final CloudEvent V03_WITH_XML_DATA = V1_WITH_XML_DATA.toV03();
public static final CloudEvent V03_WITH_TEXT_DATA = V1_WITH_TEXT_DATA.toV03();
@ -82,6 +95,10 @@ public class Data {
return Stream.concat(v1Events(), v03Events()).filter(e -> e.getExtensions().isEmpty());
}
public static Stream<CloudEvent> allEventsWithStringExtensions() {
return Stream.concat(v1EventsWithStringExt(), v03EventsWithStringExt());
}
public static Stream<CloudEvent> v1Events() {
return Stream.of(
Data.V1_MIN,
@ -102,4 +119,20 @@ public class Data {
);
}
public static Stream<CloudEvent> v1EventsWithStringExt() {
return v1Events().map(ce -> {
io.cloudevents.v1.CloudEventBuilder builder = CloudEvent.buildV1(ce);
ce.getExtensions().forEach((k, v) -> builder.withExtension(k, v.toString()));
return builder.build();
});
}
public static Stream<CloudEvent> v03EventsWithStringExt() {
return v03Events().map(ce -> {
io.cloudevents.v03.CloudEventBuilder builder = CloudEvent.buildV03(ce);
ce.getExtensions().forEach((k, v) -> builder.withExtension(k, v.toString()));
return builder.build();
});
}
}

View File

@ -0,0 +1,30 @@
# Json EventFormat implementation with Jackson
Implementation of [`EventFormat`](../../api/src/main/java/io/cloudevents/format/EventFormat.java) using Jackson.
For Maven:
```xml
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>json-jackson</artifactId>
<version>2.0.0-SNAPSHOT </version>
</dependency>
```
## Usage
You don't need to perform any operation to configure the module, more than adding the dependency to your project:
```java
CloudEvent event = CloudEvent.buildV1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))
.build();
byte[] serialized = EventFormatProvider
.getInstance()
.resolveFormat("application/json")
.serialize(event);
```

View File

@ -68,14 +68,14 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.4.2</version>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.10.0</version>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
@ -95,7 +95,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<version>${maven-surefire-plugin.version}</version>
</plugin>
</plugins>
</build>

View File

@ -24,8 +24,6 @@ import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventSerializationException;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
public final class JsonFormat implements EventFormat {
@ -79,8 +77,8 @@ public final class JsonFormat implements EventFormat {
}
@Override
public Set<String> supportedContentTypes() {
return Collections.singleton(CONTENT_TYPE);
public String serializedContentType() {
return CONTENT_TYPE;
}
/**

View File

@ -89,7 +89,7 @@ class JsonFormatTest {
assertThat(output).isEqualTo(input);
}
private static Stream<Arguments> serializeTestArgumentsDefault() {
public static Stream<Arguments> serializeTestArgumentsDefault() {
return Stream.of(
Arguments.of(V03_MIN, "v03/min.json"),
Arguments.of(V03_WITH_JSON_DATA, "v03/json_data.json"),
@ -104,7 +104,7 @@ class JsonFormatTest {
);
}
private static Stream<Arguments> serializeTestArgumentsString() {
public static Stream<Arguments> serializeTestArgumentsString() {
return Stream.of(
Arguments.of(V03_WITH_JSON_DATA, "v03/json_data.json"),
Arguments.of(V03_WITH_JSON_DATA_WITH_EXT, "v03/json_data_with_ext.json"),
@ -117,7 +117,7 @@ class JsonFormatTest {
);
}
private static Stream<Arguments> serializeTestArgumentsBase64() {
public static Stream<Arguments> serializeTestArgumentsBase64() {
return Stream.of(
Arguments.of(V03_WITH_JSON_DATA, "v03/base64_json_data.json"),
Arguments.of(V03_WITH_JSON_DATA_WITH_EXT, "v03/base64_json_data_with_ext.json"),
@ -130,7 +130,7 @@ class JsonFormatTest {
);
}
private static Stream<Arguments> deserializeTestArguments() {
public static Stream<Arguments> deserializeTestArguments() {
return Stream.of(
Arguments.of("v03/min.json", V03_MIN),
Arguments.of("v03/json_data.json", V03_WITH_JSON_DATA),
@ -153,7 +153,7 @@ class JsonFormatTest {
);
}
private static Stream<String> roundTripTestArguments() {
public static Stream<String> roundTripTestArguments() {
return Stream.of(
"v03/min.json",
"v03/json_data.json",
@ -167,7 +167,7 @@ class JsonFormatTest {
);
}
private static byte[] loadFile(String input) {
public static byte[] loadFile(String input) {
try {
return String.join(
"",

View File

@ -1,6 +1,4 @@
# HTTP Transport Util for Eclipse Vert.x
## Receiving CloudEvents
# HTTP Transport for Eclipse Vert.x
For Maven based projects, use the following to configure the CloudEvents Vertx HTTP Transport:
@ -8,54 +6,91 @@ For Maven based projects, use the following to configure the CloudEvents Vertx H
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>http-vertx</artifactId>
<version>1.3.0</version>
<version>2.0.0-SNAPSHOT </version>
</dependency>
```
Below is a sample on how to use [Vert.x API for RxJava 2](https://vertx.io/docs/vertx-rx/java2/) for reading CloudEvents from an HttpServerRequest:
## Receiving CloudEvents
Below is a sample on how to read and write CloudEvents:
```java
import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
import io.cloudevents.http.vertx.VertxMessage;
import io.cloudevents.CloudEvent;
import io.vertx.core.http.HttpHeaders;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.core.AbstractVerticle;
public class CloudEventVerticle extends AbstractVerticle {
public class CloudEventServerVerticle extends AbstractVerticle {
public void start() {
vertx.createHttpServer()
.requestHandler(req -> {
VertxCloudEvents.create().rxReadFromRequest(req)
.subscribe((receivedEvent, throwable) -> {
if (receivedEvent != null) {
// I got a CloudEvent object:
System.out.println("The event type: " + receivedEvent.getEventType());
.requestHandler(req -> {
VertxMessage.fromHttpServerRequest(req)
.onComplete(result -> {
// If decoding succeeded, we should write the event back
if (result.succeeded()) {
CloudEvent event = result.result().toEvent();
// Echo the message, as binary mode
event
.asBinaryMessage()
.visit(VertxHttpServerResponseMessageVisitor.create(req.response()));
}
req.response().setStatusCode(500).end();
});
req.response().end();
})
.listen(8080, serverResult -> {
if (serverResult.succeeded()) {
System.out.println("Server started on port " + serverResult.result().actualPort());
} else {
System.out.println("Error starting the server");
serverResult.cause().printStackTrace();
}
)
.rxListen(8080)
.subscribe(server -> {
System.out.println("Server running!");
});
});
}
}
```
## Sending CloudEvents
Below is a sample on how to use the client to send a CloudEvent:
Below is a sample on how to use the client to send and receive a CloudEvent:
```java
final HttpClientRequest request = vertx.createHttpClient().post(8080, "localhost", "/");
import io.cloudevents.http.vertx.VertxHttpClientRequestMessageVisitor;import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
import io.cloudevents.http.vertx.VertxMessage;
import io.cloudevents.CloudEvent;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpClient;
import io.vertx.core.AbstractVerticle;
import java.net.URI;
// add a client response handler
request.handler(resp -> {
// react on the server response
});
public class CloudEventClientVerticle extends AbstractVerticle {
// write the CloudEvent to the given HTTP Post request object
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, request);
request.end();
public void start() {
HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.postAbs("http://localhost:8080")
.handler(httpClientResponse -> {
VertxMessage
.fromHttpClientResponse(httpClientResponse)
.onComplete(result -> {
if (result.succeeded()) {
CloudEvent event = result.result().toEvent();
}
});
});
CloudEvent event = CloudEvent.buildV1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))
.build();
// Write request as binary
event
.asBinaryMessage()
.visit(VertxHttpClientRequestMessageVisitor.create(request));
}
}
```

View File

@ -20,45 +20,45 @@
<parent>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-parent</artifactId>
<version>1.3.0</version>
<relativePath>../..</relativePath>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>http-vertx</artifactId>
<name>CloudEvents - vertx-transport-http</name>
<name>CloudEvents - Vert.x Http Integration</name>
<packaging>jar</packaging>
<dependencies>
<properties>
<vertx.version>3.9.0</vertx.version>
<assertj-core.version>3.11.0</assertj-core.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<!-- Test deps -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<version>${vertx.version}</version>
<optional>true</optional>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>${vertx.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
@ -66,54 +66,22 @@
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>default-compile</id>
<configuration>
<annotationProcessors>
<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
</annotationProcessors>
<compilerArgs>
<arg>-AoutputDirectory=${project.basedir}/src/main</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
</plugin>
</plugins>
</build>
<properties>
<vertx.version>3.6.0</vertx.version>
<maven-surefire-plugin.version>2.22.0</maven-surefire-plugin.version>
<assertj-core.version>3.11.0</assertj-core.version>
<junit-jupiter.version>5.3.2</junit-jupiter.version>
</properties>
</project>

View File

@ -1,46 +0,0 @@
/**
* Copyright 2018 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.impl.VertxCloudEventsImpl;
import io.cloudevents.v02.AttributesImpl;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerRequest;
@VertxGen
public interface VertxCloudEvents {
static VertxCloudEvents create() {
return new VertxCloudEventsImpl();
}
@GenIgnore(GenIgnore.PERMITTED_TYPE)
void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<AttributesImpl, String>>> resultHandler);
@GenIgnore(GenIgnore.PERMITTED_TYPE)
void readFromRequest(HttpServerRequest request, Class[] extensions, Handler<AsyncResult<CloudEvent<AttributesImpl, String>>> resultHandler);
@GenIgnore(GenIgnore.PERMITTED_TYPE)
void writeToHttpClientRequest(CloudEvent<AttributesImpl, String> ce, HttpClientRequest request);
@GenIgnore(GenIgnore.PERMITTED_TYPE)
void writeToHttpClientRequest(CloudEvent<AttributesImpl, String> ce, boolean binary, HttpClientRequest request);
}

View File

@ -0,0 +1,18 @@
package io.cloudevents.http.vertx;
import io.cloudevents.http.vertx.impl.VertxHttpClientRequestMessageVisitorImpl;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitor;
import io.vertx.core.http.HttpClientRequest;
/**
* Visitor for {@link io.cloudevents.message.Message} that can write both structured and binary messages to a {@link HttpClientRequest}.
* When the visit ends, the request is ended with {@link HttpClientRequest#end(io.vertx.core.buffer.Buffer)}
*/
public interface VertxHttpClientRequestMessageVisitor extends MessageVisitor<VertxHttpClientRequestMessageVisitor, HttpClientRequest>, BinaryMessageVisitor<HttpClientRequest> {
static VertxHttpClientRequestMessageVisitor create(HttpClientRequest req) {
return new VertxHttpClientRequestMessageVisitorImpl(req);
}
}

View File

@ -0,0 +1,18 @@
package io.cloudevents.http.vertx;
import io.cloudevents.http.vertx.impl.VertxHttpServerResponseMessageVisitorImpl;
import io.cloudevents.message.BinaryMessageVisitor;
import io.cloudevents.message.MessageVisitor;
import io.vertx.core.http.HttpServerResponse;
/**
* Visitor for {@link io.cloudevents.message.Message} that can write both structured and binary messages to a {@link HttpServerResponse}.
* When the visit ends, the request is ended with {@link HttpServerResponse#end(io.vertx.core.buffer.Buffer)}
*/
public interface VertxHttpServerResponseMessageVisitor extends MessageVisitor<VertxHttpServerResponseMessageVisitor, HttpServerResponse>, BinaryMessageVisitor<HttpServerResponse> {
static VertxHttpServerResponseMessageVisitor create(HttpServerResponse res) {
return new VertxHttpServerResponseMessageVisitorImpl(res);
}
}

View File

@ -0,0 +1,109 @@
package io.cloudevents.http.vertx;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.format.EventFormatProvider;
import io.cloudevents.http.vertx.impl.BinaryVertxMessageImpl;
import io.cloudevents.http.vertx.impl.StructuredVertxMessageImpl;
import io.cloudevents.message.Message;
import io.cloudevents.message.impl.UnknownEncodingMessage;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
/**
* Interface representing a Message implementation using Vert.x types
*/
public interface VertxMessage extends Message {
/**
* Create a new Message using Vert.x headers and body.
*
* @param headers Http headers
* @param body nullable buffer of the body
* @return a Message implementation with potentially an unknown encoding
* @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid
*/
static Message create(MultiMap headers, Buffer body) throws IllegalArgumentException {
// Let's try structured mode
String ct = headers.get(HttpHeaders.CONTENT_TYPE);
if (ct != null) {
EventFormat format = EventFormatProvider.getInstance().resolveFormat(ct);
if (format != null) {
return new StructuredVertxMessageImpl(format, body);
}
}
// Let's try binary mode
String specVersionUnparsed = headers.get(BinaryVertxMessageImpl.CE_SPEC_VERSION_HEADER);
if (specVersionUnparsed != null) {
return new BinaryVertxMessageImpl(SpecVersion.parse(specVersionUnparsed), headers, body);
}
return new UnknownEncodingMessage();
}
/**
* Build a message starting from an {@link HttpServerRequest}
*
* @param request
* @return
*/
static Future<Message> fromHttpServerRequest(HttpServerRequest request) {
Promise<Message> prom = Promise.promise();
request.exceptionHandler(prom::tryFail);
request.bodyHandler(b -> {
try {
prom.complete(create(request.headers(), b));
} catch (IllegalArgumentException e) {
prom.fail(e);
}
});
return prom.future();
}
/**
* Like {@link VertxMessage#fromHttpServerRequest(HttpServerRequest)}
*
* @param request
* @param handler
*/
static void fromHttpServerRequest(HttpServerRequest request, Handler<AsyncResult<Message>> handler) {
fromHttpServerRequest(request).onComplete(handler);
}
/**
* Build a message starting from an {@link HttpClientResponse}
*
* @param request
* @return
*/
static Future<Message> fromHttpClientResponse(HttpClientResponse request) {
Promise<Message> prom = Promise.promise();
request.exceptionHandler(prom::tryFail);
request.bodyHandler(b -> {
try {
prom.complete(create(request.headers(), b));
} catch (IllegalArgumentException e) {
prom.fail(e);
}
});
return prom.future();
}
/**
* Like {@link VertxMessage#fromHttpClientResponse(HttpClientResponse)}
*
* @param response
* @param handler
*/
static void fromHttpClientResponse(HttpClientResponse response, Handler<AsyncResult<Message>> handler) {
fromHttpClientResponse(response).onComplete(handler);
}
}

View File

@ -0,0 +1,88 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.http.vertx.VertxMessage;
import io.cloudevents.message.*;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BinaryVertxMessageImpl implements VertxMessage {
public static final Map<String, CharSequence> OPTIMIZED_ATTRIBUTES_TO_HEADERS = Stream.concat(
Stream.concat(SpecVersion.V1.getMandatoryAttributes().stream(), SpecVersion.V1.getOptionalAttributes().stream()),
Stream.concat(SpecVersion.V03.getMandatoryAttributes().stream(), SpecVersion.V03.getOptionalAttributes().stream())
)
.distinct()
.collect(Collectors.toMap(Function.identity(), v -> HttpHeaders.createOptimized("ce-" + v)));
public static final Map<CharSequence, String> OPTIMIZED_HEADERS_TO_ATTRIBUTES = OPTIMIZED_ATTRIBUTES_TO_HEADERS
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
public static final String CE_PREFIX = "ce-";
public static final CharSequence CE_SPEC_VERSION_HEADER = HttpHeaders.createOptimized("ce-specversion");
private final SpecVersion version;
private final MultiMap headers;
private final Buffer body;
public BinaryVertxMessageImpl(SpecVersion version, MultiMap headers, Buffer body) {
this.version = version;
this.headers = headers;
this.body = body;
}
@Override
public Encoding getEncoding() {
return Encoding.BINARY;
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
BinaryMessageVisitor<V> visitor = visitorFactory.createBinaryMessageVisitor(this.version);
// Grab from headers the attributes and extensions
this.headers.forEach(e -> {
try {
if (e.getKey().substring(0, 3).equalsIgnoreCase(CE_PREFIX)) {
String name = e.getKey().substring(3).toLowerCase();
if (name.equals("specversion")) {
return;
}
if (this.version.getAllAttributes().contains(name)) {
visitor.setAttribute(name, e.getValue());
} else {
visitor.setExtension(name, e.getValue());
}
}
} catch (StringIndexOutOfBoundsException ex) {
// String is smaller than 3 characters and it's not equal for sure to CE_PREFIX
}
});
String ct = this.headers.get(HttpHeaders.CONTENT_TYPE);
if (ct != null) {
visitor.setAttribute("datacontenttype", ct);
}
// Set the payload
if (this.body != null && this.body.length() != 0) {
visitor.setBody(this.body.getBytes());
}
return visitor.end();
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
}

View File

@ -0,0 +1,27 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.SpecVersion;
import io.vertx.core.http.HttpHeaders;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CloudEventsHeaders {
public static final Map<String, CharSequence> ATTRIBUTES_TO_HEADERS = Stream.concat(
Stream.concat(SpecVersion.V1.getMandatoryAttributes().stream(), SpecVersion.V1.getOptionalAttributes().stream()),
Stream.concat(SpecVersion.V03.getMandatoryAttributes().stream(), SpecVersion.V03.getOptionalAttributes().stream())
)
.distinct()
.collect(Collectors.toMap(Function.identity(), v -> {
if (v.equals("datacontenttype")) {
return HttpHeaders.CONTENT_TYPE;
}
return HttpHeaders.createOptimized("ce-" + v);
}));
public static final CharSequence SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
}

View File

@ -0,0 +1,32 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.format.EventFormat;
import io.cloudevents.http.vertx.VertxMessage;
import io.cloudevents.message.*;
import io.vertx.core.buffer.Buffer;
public class StructuredVertxMessageImpl implements VertxMessage {
private final EventFormat format;
private final Buffer buffer;
public StructuredVertxMessageImpl(EventFormat format, Buffer buffer) {
this.format = format;
this.buffer = buffer;
}
@Override
public Encoding getEncoding() {
return Encoding.STRUCTURED;
}
@Override
public <T extends BinaryMessageVisitor<V>, V> V visit(BinaryMessageVisitorFactory<T, V> visitorFactory) throws MessageVisitException, IllegalStateException {
throw Encoding.WRONG_ENCODING_EXCEPTION;
}
@Override
public <T> T visit(StructuredMessageVisitor<T> visitor) throws MessageVisitException, IllegalStateException {
return visitor.setEvent(this.format, this.buffer.getBytes());
}
}

View File

@ -1,140 +0,0 @@
/**
* Copyright 2019 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.vertx.impl;
import static io.vertx.core.http.HttpHeaders.createOptimized;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.core.type.TypeReference;
import io.cloudevents.CloudEvent;
import io.cloudevents.format.Wire;
import io.cloudevents.http.vertx.VertxCloudEvents;
import io.cloudevents.json.Json;
import io.cloudevents.v02.AttributesImpl;
import io.cloudevents.v02.CloudEventImpl;
import io.cloudevents.v02.http.Marshallers;
import io.cloudevents.v02.http.Unmarshallers;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
public final class VertxCloudEventsImpl implements VertxCloudEvents {
private final static CharSequence BINARY_TYPE = HttpHeaders.createOptimized("application/json");
private final static CharSequence STRUCTURED_TYPE = HttpHeaders.createOptimized("application/cloudevents+json");
@Override
public void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<AttributesImpl, String>>> resultHandler) {
this.readFromRequest(request, null, resultHandler);
}
@Override
public void readFromRequest(HttpServerRequest request, Class[] extensions, Handler<AsyncResult<CloudEvent<AttributesImpl, String>>> resultHandler) {
final MultiMap headers = request.headers();
// binary mode
if (headers.get(HttpHeaders.CONTENT_TYPE).equalsIgnoreCase(BINARY_TYPE.toString())) {
request.bodyHandler((Buffer buff) -> {
CloudEvent<AttributesImpl, String> event =
Unmarshallers.binary(String.class)
.withHeaders(() -> {
final Map<String, Object> result = new HashMap<>();
headers.iterator()
.forEachRemaining(header -> {
result.put(header.getKey(), header.getValue());
});
return Collections.unmodifiableMap(result);
})
.withPayload(() -> {
return buff.toString();
})
.unmarshal();
resultHandler.handle(Future.succeededFuture(event));
});
} else if (headers.get(HttpHeaders.CONTENT_TYPE).equalsIgnoreCase(STRUCTURED_TYPE.toString())) {
// structured read of the body
request.bodyHandler((Buffer buff) -> {
if (buff.length()>0) {
resultHandler.handle(Future.succeededFuture(Json.decodeValue(buff.toString(),
new TypeReference<CloudEventImpl<String>>() {})));
} else {
throw new IllegalArgumentException("no cloudevent body");
}
});
} else {
throw new IllegalArgumentException("no cloudevent type identified");
}
}
@Override
public void writeToHttpClientRequest(CloudEvent<AttributesImpl, String> cloudEvent, HttpClientRequest request) {
writeToHttpClientRequest(cloudEvent, Boolean.TRUE, request);
}
@Override
public void writeToHttpClientRequest(CloudEvent<AttributesImpl, String> cloudEvent, boolean binary, HttpClientRequest request) {
if (binary) {
Wire<String, String, String> wire =
Marshallers.<String>binary()
.withEvent(() -> cloudEvent)
.marshal();
// setting the right content-length:
request.putHeader(HttpHeaders.CONTENT_LENGTH, createOptimized("0"));
wire.getPayload().ifPresent((payload) -> {
request.putHeader(HttpHeaders.CONTENT_LENGTH,
createOptimized(String.valueOf(payload.length())));
});
// read required headers
wire.getHeaders().entrySet()
.stream()
.forEach(header -> {
request.putHeader(createOptimized(header.getKey()),
createOptimized(header.getValue()));
});
wire.getPayload().ifPresent((payload) -> {
request.write(payload);
});
} else {
// read required headers
request.putHeader(HttpHeaders.CONTENT_TYPE, STRUCTURED_TYPE);
final String json = Json.encode(cloudEvent);
request.putHeader(HttpHeaders.CONTENT_LENGTH,
createOptimized(String.valueOf(json.length())));
// this the body
request.write(json);
}
}
}

View File

@ -0,0 +1,66 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.http.vertx.VertxHttpClientRequestMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
public class VertxHttpClientRequestMessageVisitorImpl implements VertxHttpClientRequestMessageVisitor {
private final HttpClientRequest request;
private boolean ended;
public VertxHttpClientRequestMessageVisitorImpl(HttpClientRequest request) {
this.request = request;
this.ended = false;
}
// Binary visitor factory
@Override
public VertxHttpClientRequestMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
this.request.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
// Binary visitor
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
this.request.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
this.request.putHeader("ce-" + name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
if (ended) {
throw MessageVisitException.newOther(new IllegalStateException("Cannot set the body because the request is already ended"));
}
this.request.end(Buffer.buffer(value));
this.ended = true;
}
@Override
public HttpClientRequest end() {
if (!ended) {
this.request.end();
}
return this.request;
}
// Structured visitor
@Override
public HttpClientRequest setEvent(EventFormat format, byte[] value) throws MessageVisitException {
this.request.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
this.request.end(Buffer.buffer(value));
return this.request;
}
}

View File

@ -0,0 +1,63 @@
package io.cloudevents.http.vertx.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.format.EventFormat;
import io.cloudevents.http.vertx.VertxHttpServerResponseMessageVisitor;
import io.cloudevents.message.MessageVisitException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
public class VertxHttpServerResponseMessageVisitorImpl implements VertxHttpServerResponseMessageVisitor {
private final HttpServerResponse response;
public VertxHttpServerResponseMessageVisitorImpl(HttpServerResponse response) {
this.response = response;
}
// Binary visitor factory
@Override
public VertxHttpServerResponseMessageVisitor createBinaryMessageVisitor(SpecVersion version) {
this.response.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
// Binary visitor
@Override
public void setAttribute(String name, String value) throws MessageVisitException {
this.response.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws MessageVisitException {
this.response.putHeader("ce-" + name, value);
}
@Override
public void setBody(byte[] value) throws MessageVisitException {
if (this.response.ended()) {
throw MessageVisitException.newOther(new IllegalStateException("Cannot set the body because the response is already ended"));
}
this.response.end(Buffer.buffer(value));
}
@Override
public HttpServerResponse end() {
if (!this.response.ended()) {
this.response.end();
}
return this.response;
}
// Structured visitor
@Override
public HttpServerResponse setEvent(EventFormat format, byte[] value) throws MessageVisitException {
this.response.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
this.response.end(Buffer.buffer(value));
return this.response;
}
}

View File

@ -1,19 +0,0 @@
/**
* Copyright 2018 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@ModuleGen(groupPackage = "io.cloudevents.http", name = "vertx")
package io.cloudevents.http.vertx;
import io.vertx.codegen.annotations.ModuleGen;

View File

@ -1,288 +0,0 @@
/**
* Copyright 2018 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.extensions.DistributedTracingExtension;
import io.cloudevents.extensions.ExtensionFormat;
import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
import io.cloudevents.v02.AttributesImpl;
import io.cloudevents.v02.CloudEventBuilder;
import io.vertx.core.http.HttpHeaders;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.http.HttpClientRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(VertxExtension.class)
class VertxCloudEventsTests {
private int port;
@BeforeEach
void setUp(Vertx vertx, VertxTestContext testContext) throws IOException {
ServerSocket socket = new ServerSocket(0);
port = socket.getLocalPort();
socket.close();
testContext.completeNow();
}
@Test
@DisplayName("Post a 0.2 CloudEvents object with a payload")
void cloudEventWithPayload(Vertx vertx, VertxTestContext testContext) {
Checkpoint serverCheckpoint = testContext.checkpoint();
Checkpoint clientCheckpoint = testContext.checkpoint();
// given
final CloudEvent<AttributesImpl, String> cloudEvent =
CloudEventBuilder.<String>builder()
.withSource(URI.create("http://knative-eventing.com"))
.withId("foo-bar")
.withType("pushevent")
.withData("{\"foo\":\"bar\"}}")
.withContenttype("application/json")
.build();
vertx.createHttpServer()
.requestHandler(req -> VertxCloudEvents
.create()
// read the object from the server request
.rxReadFromRequest(req)
.doOnError(testContext::failNow)
.subscribe(event -> testContext.verify(() -> {
// test
assertThat(event.getAttributes().getId()).isEqualTo(cloudEvent.getAttributes().getId());
assertThat(event.getAttributes().getSource().toString()).isEqualTo(cloudEvent.getAttributes().getSource().toString());
assertThat(event.getAttributes().getType()).isEqualTo(cloudEvent.getAttributes().getType());
assertThat(event.getData()).isPresent();
// write the response back to the caller
req.response().end();
serverCheckpoint.flag();
})))
.rxListen(port)
.doOnError(testContext::failNow)
.subscribe(server -> {
// create client to POST a CloudEvent to the server
final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
req.handler(resp -> testContext.verify(() -> {
assertThat(resp.statusCode()).isEqualTo(200);
clientCheckpoint.flag();
}));
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
req.end();
});
}
@Test
@DisplayName("Post a 0.2 CloudEvents object without a payload")
void cloudEventWithoutPayload(Vertx vertx, VertxTestContext testContext) {
Checkpoint serverCheckpoint = testContext.checkpoint();
Checkpoint clientCheckpoint = testContext.checkpoint();
// given
final CloudEvent<AttributesImpl, String> cloudEvent =
CloudEventBuilder.<String>builder()
.withSource(URI.create("http://knative-eventing.com"))
.withId("foo-bar")
.withType("pushevent")
.withContenttype("application/json")
.build();
vertx.createHttpServer()
.requestHandler(req -> VertxCloudEvents
.create()
// read the object from the server request
.rxReadFromRequest(req)
.doOnError(testContext::failNow)
.subscribe(event -> testContext.verify(() -> {
// check headers
assertThat(req.headers().get("ce-specversion")).isEqualTo("0.2");
assertThat(req.headers().get("cloudEventsVersion")).isNull();
assertThat(req.headers().get("ce-id")).isEqualTo("foo-bar");
assertThat(req.headers().get("ce-eventID")).isNull();
// check parsed object
assertThat(event.getAttributes().getId()).isEqualTo(cloudEvent.getAttributes().getId());
assertThat(event.getAttributes().getSource().toString()).isEqualTo(cloudEvent.getAttributes().getSource().toString());
assertThat(event.getAttributes().getType()).isEqualTo(cloudEvent.getAttributes().getType());
assertThat(event.getData()).isNotPresent();
// write the response back to the caller
req.response().end();
serverCheckpoint.flag();
})))
.rxListen(port)
.doOnError(testContext::failNow)
.subscribe(server -> {
// create client to POST a CloudEvent to the server
final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
req.handler(resp -> testContext.verify(() -> {
assertThat(resp.statusCode()).isEqualTo(200);
clientCheckpoint.flag();
}));
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
req.end();
});
}
@Test
@DisplayName("Post an incomplete cloud event")
void incompleteCloudEvent(Vertx vertx, VertxTestContext testContext) {
Checkpoint serverCheckpoint = testContext.checkpoint();
Checkpoint clientCheckpoint = testContext.checkpoint();
vertx.createHttpServer()
.requestHandler(req -> VertxCloudEvents
.create()
// read the object from the server request
.rxReadFromRequest(req)
.subscribe((e, t) -> {
if (e != null) {
testContext.failNow(new AssertionError("request was not complete, but got: " + e));
} else {
req.response().end();
serverCheckpoint.flag();
}
}))
.rxListen(port)
.doOnError(testContext::failNow)
.subscribe(server -> {
// create client to POST a CloudEvent to the server
final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
// create incomplete CloudEvent request
req.putHeader(HttpHeaders.createOptimized("ce-specversion"), HttpHeaders.createOptimized("0.2"));
req.putHeader(HttpHeaders.createOptimized("ce-type"), HttpHeaders.createOptimized("pushevent"));
req.putHeader(HttpHeaders.createOptimized("foo"), HttpHeaders.createOptimized("bar"));
req.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
req.handler(resp -> testContext.verify(() -> {
assertThat(resp.statusCode()).isEqualTo(200);
clientCheckpoint.flag();
}));
req.end();
});
}
@Test
@DisplayName("Post a 0.2 CloudEvents object with a payload")
void cloudEventWithExtension(Vertx vertx, VertxTestContext testContext) {
Checkpoint serverCheckpoint = testContext.checkpoint();
Checkpoint clientCheckpoint = testContext.checkpoint();
final DistributedTracingExtension dte = new DistributedTracingExtension();
dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
final ExtensionFormat tracing = new DistributedTracingExtension.Format(dte);
final CloudEvent<AttributesImpl, String> cloudEvent =
CloudEventBuilder.<String>builder()
.withSource(URI.create("http://knative-eventing.com"))
.withId("foo-bar")
.withExtension(tracing)
.withType("pushevent")
.withContenttype("application/json")
.build();
final Class[] extensions = {DistributedTracingExtension.class};
vertx.createHttpServer()
.requestHandler(req -> VertxCloudEvents
.create()
// read from request with expected Extension types
.rxReadFromRequest(req, extensions)
.doOnError(testContext::failNow)
.subscribe(event -> testContext.verify(() -> {
assertThat(event.getAttributes().getId()).isEqualTo(cloudEvent.getAttributes().getId());
// test
System.out.println(">>>>>>>>>>>>" + req.headers());
assertThat(req.headers().get("traceparent")).isEqualTo("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
assertThat(req.headers().get("tracestate")).isEqualTo("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
assertThat(event.getExtensions().get("distributedTracing")).isNotNull();
assertThat(event.getExtensions().get("distributedTracing")).extracting("traceparent", "tracestate")
.contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
// write the response back to the caller
req.response().end();
serverCheckpoint.flag();
})))
.rxListen(port)
.doOnError(testContext::failNow)
.subscribe(server -> {
// create client to POST a CloudEvent to the server
final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
req.handler(resp -> testContext.verify(() -> {
assertThat(resp.statusCode()).isEqualTo(200);
clientCheckpoint.flag();
}));
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
req.end();
});
}
@Test
@DisplayName("Post a 0.2 CloudEvents object without a payload")
void structuredCloudEvent(Vertx vertx, VertxTestContext testContext) {
Checkpoint serverCheckpoint = testContext.checkpoint();
Checkpoint clientCheckpoint = testContext.checkpoint();
// given
final CloudEvent<AttributesImpl, String> cloudEvent =
CloudEventBuilder.<String>builder()
.withSource(URI.create("http://knative-eventing.com"))
.withId("foo-bar")
.withType("pushevent")
.build();
vertx.createHttpServer()
.requestHandler(req -> VertxCloudEvents
.create()
// read the object from the server request
.rxReadFromRequest(req)
.doOnError(testContext::failNow)
.subscribe(event -> testContext.verify(() -> {
// write the response back to the caller
req.response().end();
serverCheckpoint.flag();
})))
.rxListen(port)
.doOnError(testContext::failNow)
.subscribe(server -> {
// create client to POST a CloudEvent to the server
final HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
req.handler(resp -> testContext.verify(() -> {
assertThat(resp.statusCode()).isEqualTo(200);
clientCheckpoint.flag();
}));
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, Boolean.FALSE,req);
req.end();
});
}
}

View File

@ -0,0 +1,242 @@
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.types.Time;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
import static io.cloudevents.test.Data.*;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(VertxExtension.class)
public class VertxHttpClientRequestMessageVisitorTest {
@ParameterizedTest
@MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
void testRequestWithStructured(CloudEvent event, Vertx vertx, VertxTestContext testContext) {
String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);
Checkpoint checkpoint = testContext.checkpoint(3);
vertx
.createHttpServer()
.requestHandler(httpServerRequest -> {
httpServerRequest.bodyHandler(buf -> {
testContext.verify(() -> {
assertThat(httpServerRequest.getHeader("content-type"))
.isEqualTo(expectedContentType);
assertThat(buf.getBytes())
.isEqualTo(expectedBuffer);
});
checkpoint.flag();
});
httpServerRequest.response().end();
})
.listen(9000, testContext.succeeding(server -> {
HttpClient client = vertx.createHttpClient();
HttpClientRequest req = client.get(server.actualPort(), "localhost", "/", httpClientResponse -> {
testContext.verify(() -> {
assertThat(httpClientResponse.statusCode())
.isEqualTo(200);
});
checkpoint.flag();
});
try {
event.asStructuredMessage(CSVFormat.INSTANCE)
.visit(VertxHttpClientRequestMessageVisitor.create(req));
} catch (Throwable e) {
testContext.failNow(e);
}
checkpoint.flag();
}));
}
@ParameterizedTest
@MethodSource("binaryTestArguments")
void testRequestWithBinary(CloudEvent event, MultiMap headers, Buffer body, Vertx vertx, VertxTestContext testContext) {
Checkpoint checkpoint = testContext.checkpoint(3);
vertx
.createHttpServer()
.requestHandler(httpServerRequest -> {
httpServerRequest.bodyHandler(buf -> {
testContext.verify(() -> {
headers.forEach(e -> {
assertThat(httpServerRequest.getHeader(e.getKey()))
.isEqualTo(e.getValue());
});
if (body != null) {
assertThat(buf.getBytes())
.isEqualTo(body.getBytes());
}
});
checkpoint.flag();
});
httpServerRequest.response().end();
})
.listen(9000, testContext.succeeding(server -> {
HttpClient client = vertx.createHttpClient();
HttpClientRequest req = client.get(server.actualPort(), "localhost", "/", httpClientResponse -> {
testContext.verify(() -> {
assertThat(httpClientResponse.statusCode())
.isEqualTo(200);
});
checkpoint.flag();
});
try {
event.asBinaryMessage()
.visit(VertxHttpClientRequestMessageVisitor.create(req));
} catch (Throwable e) {
testContext.failNow(e);
}
checkpoint.flag();
}));
}
public static Stream<Arguments> binaryTestArguments() {
return Stream.of(
// V03
Arguments.of(
V03_MIN,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString()),
null
),
Arguments.of(
V03_WITH_JSON_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V03_WITH_JSON_DATA_WITH_EXT_STRING,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10"),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V03_WITH_XML_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_XML_SERIALIZED)
),
Arguments.of(
V03_WITH_TEXT_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_TEXT_SERIALIZED)
),
// V1
Arguments.of(
V1_MIN,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString()),
null
),
Arguments.of(
V1_WITH_JSON_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V1_WITH_JSON_DATA_WITH_EXT_STRING,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10"),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V1_WITH_XML_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_XML_SERIALIZED)
),
Arguments.of(
V1_WITH_TEXT_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_TEXT_SERIALIZED)
)
);
}
}

View File

@ -0,0 +1,238 @@
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.types.Time;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
import static io.cloudevents.test.Data.*;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(VertxExtension.class)
public class VertxHttpServerResponseMessageVisitorTest {
@ParameterizedTest
@MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
void testReplyWithStructured(CloudEvent event, Vertx vertx, VertxTestContext testContext) {
Checkpoint checkpoint = testContext.checkpoint(2);
vertx
.createHttpServer()
.requestHandler(httpServerRequest -> {
try {
event.asStructuredMessage(CSVFormat.INSTANCE).visit(
VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response())
);
checkpoint.flag();
} catch (Throwable e) {
testContext.failNow(e);
}
})
.listen(9000, testContext.succeeding(server -> {
HttpClient client = vertx.createHttpClient();
client
.get(server.actualPort(), "localhost", "/")
.handler(res -> {
res.bodyHandler(buf -> {
testContext.verify(() -> {
assertThat(res.statusCode())
.isEqualTo(200);
assertThat(res.getHeader("content-type"))
.isEqualTo(CSVFormat.INSTANCE.serializedContentType());
assertThat(buf.getBytes())
.isEqualTo(CSVFormat.INSTANCE.serialize(event));
checkpoint.flag();
});
});
})
.end();
}));
}
@ParameterizedTest
@MethodSource("binaryTestArguments")
void testReplyWithBinary(CloudEvent event, MultiMap headers, Buffer body, Vertx vertx, VertxTestContext testContext) {
Checkpoint checkpoint = testContext.checkpoint(2);
vertx
.createHttpServer()
.requestHandler(httpServerRequest -> {
try {
event.asBinaryMessage().visit(
VertxHttpServerResponseMessageVisitor.create(httpServerRequest.response())
);
checkpoint.flag();
} catch (Throwable e) {
testContext.failNow(e);
}
})
.listen(9000, testContext.succeeding(server -> {
HttpClient client = vertx.createHttpClient();
client
.get(server.actualPort(), "localhost", "/")
.handler(res -> {
res.bodyHandler(buf -> {
testContext.verify(() -> {
assertThat(res.statusCode())
.isEqualTo(200);
headers.forEach(e -> {
assertThat(res.getHeader(e.getKey())).isEqualTo(e.getValue());
});
if (body != null) {
assertThat(buf.getBytes())
.isEqualTo(body.getBytes());
}
});
checkpoint.flag();
});
})
.end();
}));
}
public static Stream<Arguments> binaryTestArguments() {
return Stream.of(
// V03
Arguments.of(
V03_MIN,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString()),
null
),
Arguments.of(
V03_WITH_JSON_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V03_WITH_JSON_DATA_WITH_EXT_STRING,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10"),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V03_WITH_XML_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_XML_SERIALIZED)
),
Arguments.of(
V03_WITH_TEXT_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_TEXT_SERIALIZED)
),
// V1
Arguments.of(
V1_MIN,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString()),
null
),
Arguments.of(
V1_WITH_JSON_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V1_WITH_JSON_DATA_WITH_EXT_STRING,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10"),
Buffer.buffer(DATA_JSON_SERIALIZED)
),
Arguments.of(
V1_WITH_XML_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_XML_SERIALIZED)
),
Arguments.of(
V1_WITH_TEXT_DATA,
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME)),
Buffer.buffer(DATA_TEXT_SERIALIZED)
)
);
}
}

View File

@ -0,0 +1,202 @@
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.message.Encoding;
import io.cloudevents.message.Message;
import io.cloudevents.mock.CSVFormat;
import io.cloudevents.types.Time;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
import static io.cloudevents.test.Data.*;
import static org.assertj.core.api.Assertions.assertThat;
public class VertxMessageTest {
@ParameterizedTest()
@MethodSource("binaryTestArguments")
public void readBinary(MultiMap headers, Buffer body, CloudEvent event) {
Message message = VertxMessage.create(headers, body);
assertThat(message.getEncoding())
.isEqualTo(Encoding.BINARY);
assertThat(message.toEvent())
.isEqualTo(event);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.test.Data#allEventsWithoutExtensions")
public void readStructured(CloudEvent event) {
byte[] serializedEvent = CSVFormat.INSTANCE.serialize(event);
Message message = VertxMessage.create(
MultiMap.caseInsensitiveMultiMap().add("content-type", CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"),
Buffer.buffer(serializedEvent)
);
assertThat(message.getEncoding())
.isEqualTo(Encoding.STRUCTURED);
assertThat(message.toEvent())
.isEqualTo(event);
}
public static Stream<Arguments> binaryTestArguments() {
return Stream.of(
// V03
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ignored", "ignored"),
null,
V03_MIN
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ignored", "ignored"),
Buffer.buffer(DATA_JSON_SERIALIZED),
V03_WITH_JSON_DATA
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10")
.add("ignored", "ignored"),
Buffer.buffer(DATA_JSON_SERIALIZED),
V03_WITH_JSON_DATA_WITH_EXT_STRING
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ignored", "ignored"),
Buffer.buffer(DATA_XML_SERIALIZED),
V03_WITH_XML_DATA
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ignored", "ignored"),
Buffer.buffer(DATA_TEXT_SERIALIZED),
V03_WITH_TEXT_DATA
),
// V1
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ignored", "ignored"),
null,
V1_MIN
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ignored", "ignored"),
Buffer.buffer(DATA_JSON_SERIALIZED),
V1_WITH_JSON_DATA
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10")
.add("ignored", "ignored"),
Buffer.buffer(DATA_JSON_SERIALIZED),
V1_WITH_JSON_DATA_WITH_EXT_STRING
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ignored", "ignored"),
Buffer.buffer(DATA_XML_SERIALIZED),
V1_WITH_XML_DATA
),
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.RFC3339_DATE_FORMAT.format(TIME))
.add("ignored", "ignored"),
Buffer.buffer(DATA_TEXT_SERIALIZED),
V1_WITH_TEXT_DATA
),
// Headers case insensitivity
Arguments.of(
MultiMap.caseInsensitiveMultiMap()
.add("Ce-sPecversion", SpecVersion.V03.toString())
.add("cE-id", ID)
.add("CE-Type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ignored", "ignored")
.add("ab", "should-not-break-anything"),
null,
V03_MIN
)
);
}
}

View File

@ -63,9 +63,9 @@
<modules>
<module>api</module>
<module>formats/json-jackson</module>
<module>http/vertx</module>
<!--
<module>cdi</module>
<module>http/vertx</module>
<module>kafka</module>
-->
</modules>
@ -74,6 +74,9 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<assertj-core.version>3.11.0</assertj-core.version>
<junit-jupiter.version>5.5.2</junit-jupiter.version>
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
</properties>
<distributionManagement>