diff --git a/api/src/main/java/io/cloudevents/CloudEvent.java b/api/src/main/java/io/cloudevents/CloudEvent.java index f1a3b4b3..068cdbec 100644 --- a/api/src/main/java/io/cloudevents/CloudEvent.java +++ b/api/src/main/java/io/cloudevents/CloudEvent.java @@ -17,7 +17,7 @@ package io.cloudevents; import java.net.URI; import java.time.ZonedDateTime; -import java.util.Map; +import java.util.List; import java.util.Optional; /** @@ -66,4 +66,9 @@ public interface CloudEvent { * The event payload. The payload depends on the eventType, schemaURL and eventTypeVersion, the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json). */ Optional getData(); + + /** + * + */ + Optional> getExtensions(); } \ No newline at end of file diff --git a/api/src/main/java/io/cloudevents/CloudEventBuilder.java b/api/src/main/java/io/cloudevents/CloudEventBuilder.java index cd736f6f..2e37231c 100644 --- a/api/src/main/java/io/cloudevents/CloudEventBuilder.java +++ b/api/src/main/java/io/cloudevents/CloudEventBuilder.java @@ -19,7 +19,9 @@ import io.cloudevents.impl.DefaultCloudEventImpl; import java.net.URI; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -36,6 +38,7 @@ public class CloudEventBuilder { private ZonedDateTime time; private URI schemaURL; private T data; + private final List extensions = new ArrayList<>(); /** * The version of the CloudEvents specification which the event uses. @@ -103,6 +106,11 @@ public class CloudEventBuilder { return this; } + public CloudEventBuilder extension(final Extension extension) { + this.extensions.add(extension); + return this; + } + /** * Constructs a new {@link CloudEvent} with the previously-set configuration. */ @@ -117,6 +125,6 @@ public class CloudEventBuilder { throw new IllegalArgumentException("please provide all required fields"); } - return new DefaultCloudEventImpl(type, specversion, source, id, time, schemaURL, contentType, data); + return new DefaultCloudEventImpl(type, specversion, source, id, time, schemaURL, contentType, data, extensions); } -} \ No newline at end of file +} diff --git a/api/src/main/java/io/cloudevents/Extension.java b/api/src/main/java/io/cloudevents/Extension.java new file mode 100644 index 00000000..62d15512 --- /dev/null +++ b/api/src/main/java/io/cloudevents/Extension.java @@ -0,0 +1,19 @@ +/** + * Copyright 2018 The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudevents; + +public interface Extension { +} diff --git a/api/src/main/java/io/cloudevents/extensions/DistributedTracingExtension.java b/api/src/main/java/io/cloudevents/extensions/DistributedTracingExtension.java new file mode 100644 index 00000000..402ceb6b --- /dev/null +++ b/api/src/main/java/io/cloudevents/extensions/DistributedTracingExtension.java @@ -0,0 +1,33 @@ +package io.cloudevents.extensions; + +import io.cloudevents.Extension; + +public class DistributedTracingExtension implements Extension { + + private String traceparent; + private String tracestate; + + public String getTraceparent() { + return traceparent; + } + + public void setTraceparent(String traceparent) { + this.traceparent = traceparent; + } + + public String getTracestate() { + return tracestate; + } + + public void setTracestate(String tracestate) { + this.tracestate = tracestate; + } + + @Override + public String toString() { + return "DistributedTracingExtension{" + + "traceparent='" + traceparent + '\'' + + ", tracestate='" + tracestate + '\'' + + '}'; + } +} diff --git a/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java b/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java index dc7369ba..d92e0946 100644 --- a/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java +++ b/api/src/main/java/io/cloudevents/impl/DefaultCloudEventImpl.java @@ -19,11 +19,13 @@ import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import io.cloudevents.CloudEvent; +import io.cloudevents.Extension; import io.cloudevents.SpecVersion; import java.io.Serializable; import java.net.URI; import java.time.ZonedDateTime; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,8 +47,9 @@ public class DefaultCloudEventImpl implements CloudEvent, Serializable { private URI schemaURL = null; private String contentType = null; private T data = null; + private List extensions = null; - public DefaultCloudEventImpl(final String type, final String specversion, final URI source, final String id, final ZonedDateTime time, final URI schemaURL, final String contentType, final T data) { + public DefaultCloudEventImpl(final String type, final String specversion, final URI source, final String id, final ZonedDateTime time, final URI schemaURL, final String contentType, final T data, final List extensions) { this.specversion = specversion; this.type = type; this.source = source; @@ -55,6 +58,7 @@ public class DefaultCloudEventImpl implements CloudEvent, Serializable { this.schemaURL = schemaURL; this.contentType = contentType; this.data = data; + this.extensions = extensions; } DefaultCloudEventImpl() { @@ -101,6 +105,11 @@ public class DefaultCloudEventImpl implements CloudEvent, Serializable { return Optional.ofNullable(data); } + @Override + public Optional> getExtensions() { + return Optional.ofNullable(extensions); + } + // protected setters, used for (JSON) deserialization @JsonAlias({"specversion", "cloudEventsVersion"}) diff --git a/api/src/test/java/io/cloudevents/CloudEventBuilderTest.java b/api/src/test/java/io/cloudevents/CloudEventBuilderTest.java index ab6dbde8..898fc028 100644 --- a/api/src/test/java/io/cloudevents/CloudEventBuilderTest.java +++ b/api/src/test/java/io/cloudevents/CloudEventBuilderTest.java @@ -15,6 +15,7 @@ */ package io.cloudevents; +import io.cloudevents.extensions.DistributedTracingExtension; import org.junit.Test; import java.net.URI; @@ -168,4 +169,31 @@ public class CloudEventBuilderTest { assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src); } + @Test + public void testBuilderWithoutDataAndDistributedTracingExtension() { + + // given + final String id = UUID.randomUUID().toString(); + final URI src = URI.create("mailto:cncf-wg-serverless@lists.cncf.io"); + final String type = "My.Cloud.Event.Type"; + final DistributedTracingExtension dte = new DistributedTracingExtension(); + dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"); + dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4"); + + // when + final CloudEvent> simpleKeyValueEvent = new CloudEventBuilder() + .type(type) + .id(id) + .source(src) + .extension(dte) + .build(); + // than + assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src); + assertThat(simpleKeyValueEvent.getExtensions().get()).contains(dte); + + Extension receivedDte = simpleKeyValueEvent.getExtensions().get().get(0); + assertThat(receivedDte).extracting("traceparent", "tracestate") + .contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4"); + } + } diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java index 510f52bd..a99f2448 100644 --- a/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java +++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/VertxCloudEvents.java @@ -16,6 +16,7 @@ package io.cloudevents.http.vertx; import io.cloudevents.CloudEvent; +import io.cloudevents.Extension; import io.cloudevents.http.vertx.impl.VertxCloudEventsImpl; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; @@ -24,6 +25,8 @@ import io.vertx.core.Handler; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpServerRequest; +import java.util.List; + @VertxGen public interface VertxCloudEvents { @@ -34,6 +37,9 @@ public interface VertxCloudEvents { @GenIgnore(GenIgnore.PERMITTED_TYPE) void readFromRequest(HttpServerRequest request, Handler>> resultHandler); + @GenIgnore(GenIgnore.PERMITTED_TYPE) + void readFromRequest(HttpServerRequest request, Class[] extensions, Handler>> resultHandler); + @GenIgnore(GenIgnore.PERMITTED_TYPE) void writeToHttpClientRequest(CloudEvent ce, HttpClientRequest request); } diff --git a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java index ac67ca49..5fc2f0c4 100644 --- a/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java +++ b/http/vertx/src/main/java/io/cloudevents/http/vertx/impl/VertxCloudEventsImpl.java @@ -17,7 +17,9 @@ package io.cloudevents.http.vertx.impl; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventBuilder; +import io.cloudevents.Extension; import io.cloudevents.SpecVersion; +import io.cloudevents.extensions.DistributedTracingExtension; import io.cloudevents.http.HttpTransportAttributes; import io.cloudevents.http.V02HttpTransportMappers; import io.cloudevents.http.vertx.VertxCloudEvents; @@ -29,10 +31,19 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import java.io.Serializable; +import java.lang.reflect.Field; import java.net.URI; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; public final class VertxCloudEventsImpl implements VertxCloudEvents { @@ -50,6 +61,12 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents { @Override public void readFromRequest(HttpServerRequest request, Handler>> resultHandler) { + this.readFromRequest(request, null, resultHandler); + + } + + @Override + public void readFromRequest(HttpServerRequest request, Class[] extensions, Handler>> resultHandler) { final MultiMap headers = request.headers(); final CloudEventBuilder builder = new CloudEventBuilder(); @@ -85,6 +102,34 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents { builder.schemaURL(URI.create(schemaURL)); } + + if (extensions != null && extensions.length > 0) { + + // move this out + Arrays.asList(extensions).forEach(ext -> { + + try { + Object extObj = ext.newInstance(); + final JsonObject extension = new JsonObject(); + Field[] fields = ext.getDeclaredFields(); + + for (Field field : fields) { + boolean accessible = field.isAccessible(); + field.setAccessible(true); + field.set(extObj, request.headers().get(field.getName())); + field.setAccessible(accessible); + } + builder.extension((Extension) extObj); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + }); + } + + + request.bodyHandler((Buffer buff) -> { if (buff.length()>0) { @@ -126,8 +171,21 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents { request.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.schemaUrlKey()), HttpHeaders.createOptimized(schemaUrl.toString())); }); + + ce.getExtensions().ifPresent(extensions -> { + + extensions.forEach(ext -> { + JsonObject.mapFrom(ext).forEach(extEntry -> { + request.putHeader(HttpHeaders.createOptimized(extEntry.getKey()), HttpHeaders.createOptimized(extEntry.getValue().toString())); + }); + }); + }); + + ce.getData().ifPresent(data -> { request.write(data.toString()); }); + + } } diff --git a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java index e544be7b..3ddb0456 100644 --- a/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java +++ b/http/vertx/src/test/java/io/cloudevents/http/vertx/VertxCloudEventsTests.java @@ -17,6 +17,7 @@ package io.cloudevents.http.vertx; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventBuilder; +import io.cloudevents.extensions.DistributedTracingExtension; import io.cloudevents.http.V01HttpTransportMappers; import io.cloudevents.http.V02HttpTransportMappers; import io.cloudevents.http.reactivex.vertx.VertxCloudEvents; @@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import java.net.URI; +import java.util.List; import java.util.logging.Logger; import static io.cloudevents.SpecVersion.V_01; @@ -211,4 +213,59 @@ class VertxCloudEventsTests { req.end(); }); } + + @Test + @DisplayName("Post a 0.2 CloudEvents object with a payload") + void cloudEventWithExtension(Vertx vertx, VertxTestContext testContext) { + Checkpoint serverCheckpoint = testContext.checkpoint(); + Checkpoint clientCheckpoint = testContext.checkpoint(); + + final DistributedTracingExtension dte = new DistributedTracingExtension(); + dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"); + dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4"); + + + final CloudEvent cloudEvent = new CloudEventBuilder() + .specVersion("0.2") + .source(URI.create("http://knative-eventing.com")) + .id("foo-bar") + .extension(dte) + .type("pushevent") + .build(); + + Class[] exceptions = {DistributedTracingExtension.class}; + + vertx.createHttpServer() + .requestHandler(req -> VertxCloudEvents + .create() + .rxReadFromRequest(req, exceptions) + .doOnError(testContext::failNow) + .subscribe(event -> testContext.verify(() -> { + assertThat(event.getId()).isEqualTo(cloudEvent.getId()); + + // extension headers + assertThat(req.headers().get("traceparent")).isEqualTo("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"); + assertThat(req.headers().get("tracestate")).isEqualTo("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4"); + + assertThat(event.getExtensions().get().get(0)).isNotNull(); + + assertThat(event.getExtensions().get().get(0)).extracting("traceparent", "tracestate") + .contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4"); + + + req.response().end(); + serverCheckpoint.flag(); + }))) + .rxListen(8080) + .doOnError(testContext::failNow) + .subscribe(server -> { + HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/"); + req.handler(resp -> testContext.verify(() -> { + assertThat(resp.statusCode()).isEqualTo(200); + clientCheckpoint.flag(); + })); + VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req); + req.end(); + }); + } }