diff --git a/api/src/main/java/io/cloudevents/CloudEvent.java b/api/src/main/java/io/cloudevents/CloudEvent.java index 3cfef81d..49c70867 100644 --- a/api/src/main/java/io/cloudevents/CloudEvent.java +++ b/api/src/main/java/io/cloudevents/CloudEvent.java @@ -25,6 +25,19 @@ import java.util.Optional; * */ public interface CloudEvent { + + // required + String EVENT_TYPE_KEY = "ce-eventType"; + String CLOUD_EVENTS_VERSION_KEY = "ce-cloudEventsVersion"; + String SOURCE_KEY = "ce-source"; + String EVENT_ID_KEY = "ce-eventID"; + + // none-required + String EVENT_TYPE_VERSION_KEY = "ce-eventTypeVersion"; + String EVENT_TIME_KEY = "ce-eventTime"; + String SCHEMA_URL_KEY = "ce-schemaURL"; + String HEADER_PREFIX = "ce-x-"; + /** * Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc. */ diff --git a/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java b/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java index ade8b62b..c36d8ec1 100644 --- a/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java +++ b/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java @@ -154,4 +154,20 @@ public class DefaultCloudEventImpl implements CloudEvent, Serializable { void setData(T data) { this.data = data; } + + @Override + public String toString() { + return "DefaultCloudEventImpl{" + + "cloudEventsVersion='" + cloudEventsVersion + '\'' + + ", extensions=" + extensions + + ", eventType='" + eventType + '\'' + + ", source=" + source + + ", eventID='" + eventID + '\'' + + ", eventTypeVersion='" + eventTypeVersion + '\'' + + ", eventTime=" + eventTime + + ", schemaURL=" + schemaURL + + ", contentType='" + contentType + '\'' + + ", data=" + data + + '}'; + } } diff --git a/http/vertx/README.md b/http/vertx/README.md new file mode 100644 index 00000000..a15f9a53 --- /dev/null +++ b/http/vertx/README.md @@ -0,0 +1,44 @@ +# HTTP Transport Util for Eclipse Vert.x + +## Receiving CloudEvents + +Below is a sample on how to read CloudEvents from an HttpRequest: + +```java +import io.vertx.core.AbstractVerticle; +public class Server extends AbstractVerticle { + public void start() { + vertx.createHttpServer().requestHandler(req -> { + + CeVertx.readFromRequest(req, reply -> { + + if (reply.succeeded()) { + + final CloudEvent receivedEvent = reply.result(); + // access the attributes: + System.out.println(receivedEvent.getEventID()); + ... + }); + + req.response() + .putHeader("content-type", "text/plain") + .end("Got a CloudEvent!"); + }).listen(8080); + } +} +``` + +## Sending CloudEvents + +Below is a sample on how to use the client to send a CloudEvent: + +```java +final HttpClientRequest request = vertx.createHttpClient().post(7890, "localhost", "/"); + +CeVertx.writeToHttpClientRequest(cloudEvent, request); + request.handler(resp -> { + context.assertEquals(resp.statusCode(), 200); + }); +request.end(); +``` + diff --git a/http/vertx/pom.xml b/http/vertx/pom.xml new file mode 100644 index 00000000..340f17f3 --- /dev/null +++ b/http/vertx/pom.xml @@ -0,0 +1,77 @@ + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 0.0.3-SNAPSHOT + + + io.cloudevents + http-vertx + CloudEvents - Vertx-transport-http + 0.0.3-SNAPSHOT + jar + + + + + io.cloudevents + cloudevents-api + ${project.version} + + + + io.vertx + vertx-core + ${vert.version} + + + + junit + junit + 4.12 + test + + + + org.assertj + assertj-core + 3.10.0 + test + + + + io.vertx + vertx-unit + ${vert.version} + test + + + + + 3.6.0.CR2 + 2.9.6 + 1.8 + 1.8 + UTF-8 + + + + diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/CeVertx.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/CeVertx.java new file mode 100644 index 00000000..94d670f7 --- /dev/null +++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/CeVertx.java @@ -0,0 +1,144 @@ +/** + * Copyright 2018 The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudevents.http.vertx; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventBuilder; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerRequest; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.stream.Collectors; + +import static io.cloudevents.CloudEvent.CLOUD_EVENTS_VERSION_KEY; +import static io.cloudevents.CloudEvent.EVENT_ID_KEY; +import static io.cloudevents.CloudEvent.EVENT_TIME_KEY; +import static io.cloudevents.CloudEvent.EVENT_TYPE_KEY; +import static io.cloudevents.CloudEvent.EVENT_TYPE_VERSION_KEY; +import static io.cloudevents.CloudEvent.HEADER_PREFIX; +import static io.cloudevents.CloudEvent.SCHEMA_URL_KEY; +import static io.cloudevents.CloudEvent.SOURCE_KEY; + +public final class CeVertx { + + private CeVertx() { + // no-op + } + + public static void writeToHttpClientRequest(final CloudEvent ce, final HttpClientRequest request) { + + // setting the right content-length: + if (ce.getData().isPresent()) { + request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized(String.valueOf(ce.getData().get().toString().length()))); + } else { + request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0")); + } + + // read required headers + request + .putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaders.createOptimized("application/json")) + .putHeader(HttpHeaders.createOptimized(CLOUD_EVENTS_VERSION_KEY), HttpHeaders.createOptimized(ce.getCloudEventsVersion())) + .putHeader(HttpHeaders.createOptimized(EVENT_TYPE_KEY), HttpHeaders.createOptimized(ce.getEventType())) + .putHeader(HttpHeaders.createOptimized(SOURCE_KEY), HttpHeaders.createOptimized(ce.getSource().toString())) + .putHeader(HttpHeaders.createOptimized(EVENT_ID_KEY), HttpHeaders.createOptimized(ce.getEventID())); + + // read optional headers + ce.getEventTypeVersion().ifPresent(eventTypeVersion -> { + request.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_VERSION_KEY), HttpHeaders.createOptimized(eventTypeVersion)); + }); + + ce.getEventTime().ifPresent(eventTime -> { + request.putHeader(HttpHeaders.createOptimized(EVENT_TIME_KEY), HttpHeaders.createOptimized(eventTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))); + }); + + ce.getSchemaURL().ifPresent(schemaUrl -> { + request.putHeader(HttpHeaders.createOptimized(SCHEMA_URL_KEY), HttpHeaders.createOptimized(schemaUrl.toString())); + }); + + ce.getData().ifPresent(data -> { + request.write(data.toString()); + }); + } + + public static void readFromRequest(final HttpServerRequest request, final Handler> resultHandler) { + + final MultiMap headers = request.headers(); + final CloudEventBuilder builder = new CloudEventBuilder(); + + try { + // just check, no need to set the version + readRequiredHeaderValue(headers, CLOUD_EVENTS_VERSION_KEY); + + builder + // set required values + .eventType(readRequiredHeaderValue(headers, EVENT_TYPE_KEY)) + .source(URI.create(readRequiredHeaderValue(headers ,SOURCE_KEY))) + .eventID(readRequiredHeaderValue(headers, EVENT_ID_KEY)) + + // set optional values + .eventTypeVersion(headers.get(EVENT_TYPE_VERSION_KEY)) + .contentType(headers.get(HttpHeaders.CONTENT_TYPE)); + + final String eventTime = headers.get(EVENT_TIME_KEY); + if (eventTime != null) { + builder.eventTime(ZonedDateTime.parse(eventTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME)); + } + + final String schemaURL = headers.get(SCHEMA_URL_KEY); + if (schemaURL != null) { + builder.schemaURL(URI.create(schemaURL)); + } + + // get the extensions + final Map extensions = + headers.entries().stream() + .filter(header -> header.getKey().startsWith(HEADER_PREFIX)) + .collect(Collectors.toMap(h -> h.getKey(), h -> h.getValue())); + + builder.extensions(extensions); + request.bodyHandler((Buffer buff) -> { + + if (buff.length()>0) { + builder.data(buff.toJsonObject().toString()); + } + resultHandler.handle(Future.succeededFuture(builder.build())); + }); + } catch (Exception e) { + resultHandler.handle(Future.failedFuture(e)); + } + } + + private static String readRequiredHeaderValue(final MultiMap headers, final String headerName) { + return requireNonNull(headers.get(headerName)); + } + + private static String requireNonNull(final String val) { + if (val == null) { + throw new IllegalArgumentException(); + } else { + return val; + } + } +} diff --git a/http/vertx/src/test/java/io/cloudevents/http/vertx/CloudEventsVertxTest.java b/http/vertx/src/test/java/io/cloudevents/http/vertx/CloudEventsVertxTest.java new file mode 100644 index 00000000..f4d60fe8 --- /dev/null +++ b/http/vertx/src/test/java/io/cloudevents/http/vertx/CloudEventsVertxTest.java @@ -0,0 +1,206 @@ +/** + * Copyright 2018 The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudevents.http.vertx; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventBuilder; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServer; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URI; +import java.util.logging.Logger; + +import static io.cloudevents.CloudEvent.CLOUD_EVENTS_VERSION_KEY; +import static io.cloudevents.CloudEvent.EVENT_TYPE_KEY; + +@RunWith(VertxUnitRunner.class) +public class CloudEventsVertxTest { + + private final static Logger logger = Logger.getLogger(CloudEventsVertxTest.class.getName()); + + private HttpServer server; + private Vertx vertx; + private int port; + + @Before + public void setUp(TestContext context) throws IOException { + vertx = Vertx.vertx(); + ServerSocket socket = new ServerSocket(0); + port = socket.getLocalPort(); + socket.close(); + server = vertx.createHttpServer(); + } + + @After + public void tearDown(TestContext context) { + vertx.close(context.asyncAssertSuccess()); + } + + @Test + public void cloudEventWithPayload(TestContext context) { + final Async async = context.async(); + + // Create the actuak CloudEvents object; + final CloudEvent cloudEvent = new CloudEventBuilder() + .source(URI.create("http://knative-eventing.com")) + .eventID("foo-bar") + .eventType("pushevent") + .data("{\"foo\":\"bar\"}}") + .build(); + + // set up the server and add a handler to check the values + server.requestHandler(req -> { + + CeVertx.readFromRequest(req, reply -> { + + if (reply.succeeded()) { + + final CloudEvent receivedEvent = reply.result(); + context.assertEquals(receivedEvent.getEventID(), cloudEvent.getEventID()); + context.assertEquals(receivedEvent.getSource().toString(), cloudEvent.getSource().toString()); + context.assertEquals(receivedEvent.getEventType(), cloudEvent.getEventType()); + context.assertEquals(receivedEvent.getData().isPresent(), Boolean.TRUE); + } + }); + + req.response().end(); + }).listen(port, ar -> { + if (ar.failed()) { + context.fail("could not start server"); + } else { + // sending it to the test-server + final HttpClientRequest request = vertx.createHttpClient().post(port, "localhost", "/"); + + request.handler(response -> { + context.assertEquals(response.statusCode(), 200); + + async.complete(); + }); + CeVertx.writeToHttpClientRequest(cloudEvent, request); + request.end(); + } + }); + logger.info("running on port: " + port); + + async.awaitSuccess(1000); + } + + @Test + public void cloudEventWithoutPayload(TestContext context) { + final Async async = context.async(); + + // Create the actuak CloudEvents object; + final CloudEvent cloudEvent = new CloudEventBuilder() + .source(URI.create("http://knative-eventing.com")) + .eventID("foo-bar") + .eventType("pushevent") + .build(); + + // set up the server and add a handler to check the values + server.requestHandler(req -> { + + CeVertx.readFromRequest(req, reply -> { + + if (reply.succeeded()) { + + final CloudEvent receivedEvent = reply.result(); + context.assertEquals(receivedEvent.getEventID(), cloudEvent.getEventID()); + context.assertEquals(receivedEvent.getSource().toString(), cloudEvent.getSource().toString()); + context.assertEquals(receivedEvent.getEventType(), cloudEvent.getEventType()); + context.assertEquals(receivedEvent.getData().isPresent(), Boolean.FALSE); + } + }); + + req.response().end(); + }).listen(port, ar -> { + if (ar.failed()) { + context.fail("could not start server"); + } else { + // sending it to the test-server + final HttpClientRequest request = vertx.createHttpClient().post(port, "localhost", "/"); + + + request.handler(resp -> { + context.assertEquals(resp.statusCode(), 200); + async.complete(); + + }); + CeVertx.writeToHttpClientRequest(cloudEvent, request); + request.end(); + + } + }); + logger.info("running on port: " + port); + + async.awaitSuccess(1000); + } + + @Test + public void incompleteCloudEvent(TestContext context) { + final Async async = context.async(); + + // set up the server and add a handler to check the values + server.requestHandler(req -> { + + CeVertx.readFromRequest(req, reply -> { + + if (reply.succeeded()) { + + context.fail("request was not complete"); + } else { + context.assertEquals(reply.failed(), Boolean.TRUE); + } + }); + + req.response().end(); + }).listen(port, ar -> { + + if (ar.failed()) { + context.fail("could not start server"); + } else { + // fire the request + // sending it to the test-server + final HttpClientRequest request = vertx.createHttpClient().post(port, "localhost", "/"); + // create incomplete CloudEvent request + request.putHeader(HttpHeaders.createOptimized(CLOUD_EVENTS_VERSION_KEY), HttpHeaders.createOptimized("0.1")); + request.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_KEY), HttpHeaders.createOptimized("pushevent")); + request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0")); + + + request.handler(resp -> { + context.assertEquals(resp.statusCode(), 200); + + async.complete(); + }); + request.end(); + } + }); + logger.info("running on port: " + port); + + async.awaitSuccess(1000); + } +} diff --git a/pom.xml b/pom.xml index 1b22bb37..43c2a613 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ api + http/vertx