Merge pull request #14 from matzew/http_vertx

WIP: vertx http transport
This commit is contained in:
Matthias Wessendorf 2018-11-29 10:22:13 +01:00 committed by GitHub
commit ce597a93fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 501 additions and 0 deletions

View File

@ -25,6 +25,19 @@ import java.util.Optional;
* *
*/ */
public interface CloudEvent<T> { public interface CloudEvent<T> {
// required
String EVENT_TYPE_KEY = "ce-eventType";
String CLOUD_EVENTS_VERSION_KEY = "ce-cloudEventsVersion";
String SOURCE_KEY = "ce-source";
String EVENT_ID_KEY = "ce-eventID";
// none-required
String EVENT_TYPE_VERSION_KEY = "ce-eventTypeVersion";
String EVENT_TIME_KEY = "ce-eventTime";
String SCHEMA_URL_KEY = "ce-schemaURL";
String HEADER_PREFIX = "ce-x-";
/** /**
* Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc. * Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc.
*/ */

View File

@ -154,4 +154,20 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
void setData(T data) { void setData(T data) {
this.data = data; this.data = data;
} }
@Override
public String toString() {
return "DefaultCloudEventImpl{" +
"cloudEventsVersion='" + cloudEventsVersion + '\'' +
", extensions=" + extensions +
", eventType='" + eventType + '\'' +
", source=" + source +
", eventID='" + eventID + '\'' +
", eventTypeVersion='" + eventTypeVersion + '\'' +
", eventTime=" + eventTime +
", schemaURL=" + schemaURL +
", contentType='" + contentType + '\'' +
", data=" + data +
'}';
}
} }

44
http/vertx/README.md Normal file
View File

@ -0,0 +1,44 @@
# HTTP Transport Util for Eclipse Vert.x
## Receiving CloudEvents
Below is a sample on how to read CloudEvents from an HttpRequest:
```java
import io.vertx.core.AbstractVerticle;
public class Server extends AbstractVerticle {
public void start() {
vertx.createHttpServer().requestHandler(req -> {
CeVertx.readFromRequest(req, reply -> {
if (reply.succeeded()) {
final CloudEvent<?> receivedEvent = reply.result();
// access the attributes:
System.out.println(receivedEvent.getEventID());
...
});
req.response()
.putHeader("content-type", "text/plain")
.end("Got a CloudEvent!");
}).listen(8080);
}
}
```
## Sending CloudEvents
Below is a sample on how to use the client to send a CloudEvent:
```java
final HttpClientRequest request = vertx.createHttpClient().post(7890, "localhost", "/");
CeVertx.writeToHttpClientRequest(cloudEvent, request);
request.handler(resp -> {
context.assertEquals(resp.statusCode(), 200);
});
request.end();
```

77
http/vertx/pom.xml Normal file
View File

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2018 The CloudEvents Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-parent</artifactId>
<version>0.0.3-SNAPSHOT</version>
</parent>
<groupId>io.cloudevents</groupId>
<artifactId>http-vertx</artifactId>
<name>CloudEvents - Vertx-transport-http</name>
<version>0.0.3-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vert.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<version>${vert.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<vert.version>3.6.0.CR2</vert.version>
<jackson.version>2.9.6</jackson.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,144 @@
/**
* Copyright 2018 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import java.net.URI;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.stream.Collectors;
import static io.cloudevents.CloudEvent.CLOUD_EVENTS_VERSION_KEY;
import static io.cloudevents.CloudEvent.EVENT_ID_KEY;
import static io.cloudevents.CloudEvent.EVENT_TIME_KEY;
import static io.cloudevents.CloudEvent.EVENT_TYPE_KEY;
import static io.cloudevents.CloudEvent.EVENT_TYPE_VERSION_KEY;
import static io.cloudevents.CloudEvent.HEADER_PREFIX;
import static io.cloudevents.CloudEvent.SCHEMA_URL_KEY;
import static io.cloudevents.CloudEvent.SOURCE_KEY;
public final class CeVertx {
private CeVertx() {
// no-op
}
public static void writeToHttpClientRequest(final CloudEvent<?> ce, final HttpClientRequest request) {
// setting the right content-length:
if (ce.getData().isPresent()) {
request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized(String.valueOf(ce.getData().get().toString().length())));
} else {
request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
}
// read required headers
request
.putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaders.createOptimized("application/json"))
.putHeader(HttpHeaders.createOptimized(CLOUD_EVENTS_VERSION_KEY), HttpHeaders.createOptimized(ce.getCloudEventsVersion()))
.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_KEY), HttpHeaders.createOptimized(ce.getEventType()))
.putHeader(HttpHeaders.createOptimized(SOURCE_KEY), HttpHeaders.createOptimized(ce.getSource().toString()))
.putHeader(HttpHeaders.createOptimized(EVENT_ID_KEY), HttpHeaders.createOptimized(ce.getEventID()));
// read optional headers
ce.getEventTypeVersion().ifPresent(eventTypeVersion -> {
request.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_VERSION_KEY), HttpHeaders.createOptimized(eventTypeVersion));
});
ce.getEventTime().ifPresent(eventTime -> {
request.putHeader(HttpHeaders.createOptimized(EVENT_TIME_KEY), HttpHeaders.createOptimized(eventTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)));
});
ce.getSchemaURL().ifPresent(schemaUrl -> {
request.putHeader(HttpHeaders.createOptimized(SCHEMA_URL_KEY), HttpHeaders.createOptimized(schemaUrl.toString()));
});
ce.getData().ifPresent(data -> {
request.write(data.toString());
});
}
public static void readFromRequest(final HttpServerRequest request, final Handler<AsyncResult<CloudEvent>> resultHandler) {
final MultiMap headers = request.headers();
final CloudEventBuilder builder = new CloudEventBuilder();
try {
// just check, no need to set the version
readRequiredHeaderValue(headers, CLOUD_EVENTS_VERSION_KEY);
builder
// set required values
.eventType(readRequiredHeaderValue(headers, EVENT_TYPE_KEY))
.source(URI.create(readRequiredHeaderValue(headers ,SOURCE_KEY)))
.eventID(readRequiredHeaderValue(headers, EVENT_ID_KEY))
// set optional values
.eventTypeVersion(headers.get(EVENT_TYPE_VERSION_KEY))
.contentType(headers.get(HttpHeaders.CONTENT_TYPE));
final String eventTime = headers.get(EVENT_TIME_KEY);
if (eventTime != null) {
builder.eventTime(ZonedDateTime.parse(eventTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
final String schemaURL = headers.get(SCHEMA_URL_KEY);
if (schemaURL != null) {
builder.schemaURL(URI.create(schemaURL));
}
// get the extensions
final Map<String, String> extensions =
headers.entries().stream()
.filter(header -> header.getKey().startsWith(HEADER_PREFIX))
.collect(Collectors.toMap(h -> h.getKey(), h -> h.getValue()));
builder.extensions(extensions);
request.bodyHandler((Buffer buff) -> {
if (buff.length()>0) {
builder.data(buff.toJsonObject().toString());
}
resultHandler.handle(Future.succeededFuture(builder.build()));
});
} catch (Exception e) {
resultHandler.handle(Future.failedFuture(e));
}
}
private static String readRequiredHeaderValue(final MultiMap headers, final String headerName) {
return requireNonNull(headers.get(headerName));
}
private static String requireNonNull(final String val) {
if (val == null) {
throw new IllegalArgumentException();
} else {
return val;
}
}
}

View File

@ -0,0 +1,206 @@
/**
* Copyright 2018 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventBuilder;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.util.logging.Logger;
import static io.cloudevents.CloudEvent.CLOUD_EVENTS_VERSION_KEY;
import static io.cloudevents.CloudEvent.EVENT_TYPE_KEY;
@RunWith(VertxUnitRunner.class)
public class CloudEventsVertxTest {
private final static Logger logger = Logger.getLogger(CloudEventsVertxTest.class.getName());
private HttpServer server;
private Vertx vertx;
private int port;
@Before
public void setUp(TestContext context) throws IOException {
vertx = Vertx.vertx();
ServerSocket socket = new ServerSocket(0);
port = socket.getLocalPort();
socket.close();
server = vertx.createHttpServer();
}
@After
public void tearDown(TestContext context) {
vertx.close(context.asyncAssertSuccess());
}
@Test
public void cloudEventWithPayload(TestContext context) {
final Async async = context.async();
// Create the actuak CloudEvents object;
final CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
.source(URI.create("http://knative-eventing.com"))
.eventID("foo-bar")
.eventType("pushevent")
.data("{\"foo\":\"bar\"}}")
.build();
// set up the server and add a handler to check the values
server.requestHandler(req -> {
CeVertx.readFromRequest(req, reply -> {
if (reply.succeeded()) {
final CloudEvent<?> receivedEvent = reply.result();
context.assertEquals(receivedEvent.getEventID(), cloudEvent.getEventID());
context.assertEquals(receivedEvent.getSource().toString(), cloudEvent.getSource().toString());
context.assertEquals(receivedEvent.getEventType(), cloudEvent.getEventType());
context.assertEquals(receivedEvent.getData().isPresent(), Boolean.TRUE);
}
});
req.response().end();
}).listen(port, ar -> {
if (ar.failed()) {
context.fail("could not start server");
} else {
// sending it to the test-server
final HttpClientRequest request = vertx.createHttpClient().post(port, "localhost", "/");
request.handler(response -> {
context.assertEquals(response.statusCode(), 200);
async.complete();
});
CeVertx.writeToHttpClientRequest(cloudEvent, request);
request.end();
}
});
logger.info("running on port: " + port);
async.awaitSuccess(1000);
}
@Test
public void cloudEventWithoutPayload(TestContext context) {
final Async async = context.async();
// Create the actuak CloudEvents object;
final CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
.source(URI.create("http://knative-eventing.com"))
.eventID("foo-bar")
.eventType("pushevent")
.build();
// set up the server and add a handler to check the values
server.requestHandler(req -> {
CeVertx.readFromRequest(req, reply -> {
if (reply.succeeded()) {
final CloudEvent<?> receivedEvent = reply.result();
context.assertEquals(receivedEvent.getEventID(), cloudEvent.getEventID());
context.assertEquals(receivedEvent.getSource().toString(), cloudEvent.getSource().toString());
context.assertEquals(receivedEvent.getEventType(), cloudEvent.getEventType());
context.assertEquals(receivedEvent.getData().isPresent(), Boolean.FALSE);
}
});
req.response().end();
}).listen(port, ar -> {
if (ar.failed()) {
context.fail("could not start server");
} else {
// sending it to the test-server
final HttpClientRequest request = vertx.createHttpClient().post(port, "localhost", "/");
request.handler(resp -> {
context.assertEquals(resp.statusCode(), 200);
async.complete();
});
CeVertx.writeToHttpClientRequest(cloudEvent, request);
request.end();
}
});
logger.info("running on port: " + port);
async.awaitSuccess(1000);
}
@Test
public void incompleteCloudEvent(TestContext context) {
final Async async = context.async();
// set up the server and add a handler to check the values
server.requestHandler(req -> {
CeVertx.readFromRequest(req, reply -> {
if (reply.succeeded()) {
context.fail("request was not complete");
} else {
context.assertEquals(reply.failed(), Boolean.TRUE);
}
});
req.response().end();
}).listen(port, ar -> {
if (ar.failed()) {
context.fail("could not start server");
} else {
// fire the request
// sending it to the test-server
final HttpClientRequest request = vertx.createHttpClient().post(port, "localhost", "/");
// create incomplete CloudEvent request
request.putHeader(HttpHeaders.createOptimized(CLOUD_EVENTS_VERSION_KEY), HttpHeaders.createOptimized("0.1"));
request.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_KEY), HttpHeaders.createOptimized("pushevent"));
request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
request.handler(resp -> {
context.assertEquals(resp.statusCode(), 200);
async.complete();
});
request.end();
}
});
logger.info("running on port: " + port);
async.awaitSuccess(1000);
}
}

View File

@ -53,6 +53,7 @@
<modules> <modules>
<module>api</module> <module>api</module>
<module>http/vertx</module>
</modules> </modules>
</project> </project>