From 6eecf29cf7c21dbd392ce35dac616cdb2a0d0a78 Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Wed, 9 Sep 2020 19:28:40 +0200 Subject: [PATCH] Generic http binding (#225) Signed-off-by: Matej Vasek --- examples/basic-http/pom.xml | 47 ++++ .../examples/http/basic/BasicHttpServer.java | 90 +++++++ .../http/basic/HttpURLConnectionClient.java | 87 ++++++ .../examples/http/basic/IOUtils.java | 43 +++ .../examples/http/basic/JettyServer.java | 103 ++++++++ examples/pom.xml | 1 + http/basic/README.md | 42 +++ http/basic/pom.xml | 66 +++++ .../cloudevents/http/HttpMessageFactory.java | 152 +++++++++++ .../http/impl/CloudEventsHeaders.java | 41 +++ .../http/impl/HttpMessageReader.java | 61 +++++ .../http/impl/HttpMessageWriter.java | 74 ++++++ .../http/HttpMessageReaderWriterTest.java | 248 ++++++++++++++++++ pom.xml | 1 + 14 files changed, 1056 insertions(+) create mode 100644 examples/basic-http/pom.xml create mode 100644 examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/BasicHttpServer.java create mode 100644 examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpURLConnectionClient.java create mode 100644 examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/IOUtils.java create mode 100644 examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/JettyServer.java create mode 100644 http/basic/README.md create mode 100644 http/basic/pom.xml create mode 100644 http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java create mode 100644 http/basic/src/main/java/io/cloudevents/http/impl/CloudEventsHeaders.java create mode 100644 http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageReader.java create mode 100644 http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageWriter.java create mode 100644 http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java diff --git a/examples/basic-http/pom.xml b/examples/basic-http/pom.xml new file mode 100644 index 00000000..a8aac155 --- /dev/null +++ b/examples/basic-http/pom.xml @@ -0,0 +1,47 @@ + + + + + + cloudevents-examples + io.cloudevents + 2.0.0-SNAPSHOT + + 4.0.0 + + cloudevents-basic-http-example + + + + io.cloudevents + cloudevents-http-basic + ${project.version} + + + io.cloudevents + cloudevents-json-jackson + ${project.version} + + + org.eclipse.jetty + jetty-server + 9.4.28.v20200408 + + + diff --git a/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/BasicHttpServer.java b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/BasicHttpServer.java new file mode 100644 index 00000000..0bcd84d6 --- /dev/null +++ b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/BasicHttpServer.java @@ -0,0 +1,90 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.examples.http.basic; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.http.HttpMessageFactory; + +import java.io.*; +import java.net.InetSocketAddress; + +public class BasicHttpServer { + + public static void main(String[] args) throws IOException { + HttpServer httpServer = HttpServer.create(new InetSocketAddress("localhost", 8080), 0); + httpServer.createContext("/echo", BasicHttpServer::echoHandler); + httpServer.start(); + } + + private static void echoHandler(HttpExchange exchange) throws IOException { + if (!"POST".equalsIgnoreCase(exchange.getRequestMethod())) { + exchange.sendResponseHeaders(405, 0); + return; + } + try { + MessageReader messageReader = createMessageReader(exchange); + CloudEvent cloudEvent = messageReader.toEvent(); + + System.out.println("Handling event: " + cloudEvent); + + MessageWriter messageWriter = createMessageWriter(exchange); + messageWriter.writeBinary(cloudEvent); + } catch (Throwable t) { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + try (PrintWriter pw = new PrintWriter(byteArrayOutputStream)) { + t.printStackTrace(pw); + } + byte[] body = byteArrayOutputStream.toByteArray(); + exchange.sendResponseHeaders(500, body.length); + try (OutputStream outputStream = exchange.getResponseBody()) { + outputStream.write(body); + } + } + } + } + + private static MessageReader createMessageReader(HttpExchange httpExchange) throws IOException { + Headers headers = httpExchange.getRequestHeaders(); + byte[] body = IOUtils.toByteArray(httpExchange.getRequestBody()); + return HttpMessageFactory.createReaderFromMultimap(headers, body); + } + + private static MessageWriter createMessageWriter(HttpExchange httpExchange) { + return HttpMessageFactory.createWriter( + httpExchange.getResponseHeaders()::add, + body -> { + try { + try (OutputStream os = httpExchange.getResponseBody()){ + if (body != null) { + httpExchange.sendResponseHeaders(200, body.length); + os.write(body); + } else { + httpExchange.sendResponseHeaders(204, -1); + } + } + } catch (IOException t) { + throw new UncheckedIOException(t); + } + } + ); + } +} diff --git a/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpURLConnectionClient.java b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpURLConnectionClient.java new file mode 100644 index 00000000..21b16db2 --- /dev/null +++ b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpURLConnectionClient.java @@ -0,0 +1,87 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.examples.http.basic; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.v1.CloudEventBuilder; +import io.cloudevents.http.HttpMessageFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +public class HttpURLConnectionClient { + + public static void main(String[] args) throws IOException { + CloudEvent ceToSend = new CloudEventBuilder() + .withId("my-id") + .withSource(URI.create("/myClient")) + .withType("dev.knative.cronjob.event") + .withDataContentType("application/json") + .withData("{ \"msg\" : \"hello\" }".getBytes(StandardCharsets.UTF_8)) + .build(); + + + URL url = new URL("http://localhost:8080/echo"); + HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(); + httpUrlConnection.setRequestMethod("POST"); + httpUrlConnection.setDoOutput(true); + httpUrlConnection.setDoInput(true); + + MessageWriter messageWriter = createMessageWriter(httpUrlConnection); + messageWriter.writeBinary(ceToSend); + + MessageReader messageReader = createMessageReader(httpUrlConnection); + CloudEvent receivedCE = messageReader.toEvent(); + + System.out.println("CloudEvent: " + receivedCE); + System.out.println("Data: " + new String(receivedCE.getData(), StandardCharsets.UTF_8)); + } + + private static MessageReader createMessageReader(HttpURLConnection httpUrlConnection) throws IOException { + Map> headers = httpUrlConnection.getHeaderFields(); + byte[] body = IOUtils.toByteArray(httpUrlConnection.getInputStream()); + return HttpMessageFactory.createReaderFromMultimap(headers, body); + } + + private static MessageWriter createMessageWriter(HttpURLConnection httpUrlConnection) { + return HttpMessageFactory.createWriter( + httpUrlConnection::setRequestProperty, + body -> { + try { + if (body != null) { + httpUrlConnection.setRequestProperty("content-length", String.valueOf(body.length)); + try (OutputStream outputStream = httpUrlConnection.getOutputStream()) { + outputStream.write(body); + } + } else { + httpUrlConnection.setRequestProperty("content-length", "0"); + } + } catch (IOException t) { + throw new UncheckedIOException(t); + } + }); + } +} diff --git a/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/IOUtils.java b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/IOUtils.java new file mode 100644 index 00000000..25725064 --- /dev/null +++ b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/IOUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package io.cloudevents.examples.http.basic; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +// based on apache's commons-io +// which is licensed under the Apache License, Version 2.0 +// https://commons.apache.org/proper/commons-io/ +// https://github.com/apache/commons-io/blob/master/src/main/java/org/apache/commons/io/IOUtils.java + +public final class IOUtils { + + private IOUtils() {} + + // since Java 9+ you may call InputStream.readAllBytes() instead of this method + public static byte[] toByteArray(InputStream body) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + byte[] buff = new byte[(1<<10) * 8]; + int read; + while ((read = body.read(buff)) != -1) { + byteArrayOutputStream.write(buff, 0, read); + } + return byteArrayOutputStream.toByteArray(); + } + } +} diff --git a/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/JettyServer.java b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/JettyServer.java new file mode 100644 index 00000000..94670447 --- /dev/null +++ b/examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/JettyServer.java @@ -0,0 +1,103 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.examples.http.basic; + +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.ServletException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.util.Enumeration; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.http.HttpMessageFactory; + +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +public class JettyServer { + + private static class Handler extends AbstractHandler { + + @Override + public void handle(String uri, + Request request, + HttpServletRequest httpServletRequest, + HttpServletResponse httpServletResponse) throws IOException, ServletException { + if (!"/echo".equalsIgnoreCase(uri)) { + httpServletResponse.setStatus(HttpStatus.NOT_FOUND_404); + return; + } + if (!"POST".equalsIgnoreCase(request.getMethod())) { + httpServletResponse.setStatus(HttpStatus.METHOD_NOT_ALLOWED_405); + return; + } + + CloudEvent receivedEvent = createMessageReader(httpServletRequest).toEvent(); + System.out.println("Handling event: " + receivedEvent); + createMessageWriter(httpServletResponse).writeBinary(receivedEvent); + } + } + + private static MessageReader createMessageReader(HttpServletRequest httpServletRequest) throws IOException { + Consumer> forEachHeader = processHeader -> { + Enumeration headerNames = httpServletRequest.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String name = headerNames.nextElement(); + processHeader.accept(name, httpServletRequest.getHeader(name)); + + } + }; + byte[] body = IOUtils.toByteArray(httpServletRequest.getInputStream()); + return HttpMessageFactory.createReader(forEachHeader, body); + } + + private static MessageWriter createMessageWriter(HttpServletResponse httpServletResponse) throws IOException { + return HttpMessageFactory.createWriter( + httpServletResponse::addHeader, + body -> { + try { + try (ServletOutputStream outputStream = httpServletResponse.getOutputStream()) { + if (body != null) { + httpServletResponse.setContentLength(body.length); + httpServletResponse.setStatus(HttpStatus.OK_200); + outputStream.write(body); + } else { + httpServletResponse.setStatus(HttpStatus.NO_CONTENT_204); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + public static void main(String[] args) throws Exception { + Server server = new Server(new InetSocketAddress("localhost", 8080)); + server.setHandler(new Handler()); + server.start(); + server.join(); + } +} diff --git a/examples/pom.xml b/examples/pom.xml index eb75f3b5..2863c129 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -17,6 +17,7 @@ kafka restful-ws-quarkus vertx + basic-http diff --git a/http/basic/README.md b/http/basic/README.md new file mode 100644 index 00000000..9b861be2 --- /dev/null +++ b/http/basic/README.md @@ -0,0 +1,42 @@ +# Generic HTTP Protocol Binding + +[![Javadocs](http://www.javadoc.io/badge/io.cloudevents/cloudevents-http-basic.svg?color=green)](http://www.javadoc.io/doc/io.cloudevents/cloudevents-http-basic) + +This module is designed to be usable with various HTTP APIs. + +There are also more specialized HTTP bindings: +* [`cloudevents-http-vertx`](../vertx) +* [`cloudevents-http-restful-ws`](../restful-ws) + +Since this module is generic it doesn't offer optimal performance for all HTTP implementations. +For better performance consider implementing `MessageReader` and `MessageWriter` that are +tailored for specific HTTP implementation. As a reference you can take aforementioned existing bindings. + +For Maven based projects, use the following to configure the CloudEvents Generic HTTP Transport: + +```xml + + io.cloudevents + cloudevents-http-basic + 2.0.0-milestone1 + +``` + +## Sending and Receiving CloudEvents + +To send and receive CloudEvents we use `MessageWriter` and `MessageReader`, respectively. +This module offers factory methods for creation of those in `HttpMessageFactory`. + +```java +public class HttpMessageFactory { + public static MessageReader createReader(Consumer> forEachHeader, byte[] body); + public static MessageReader createReader(Map headers, byte[] body); + public static MessageReader createReaderFromMultimap(Map> headers, byte[] body); + public static MessageWriter createWriter(BiConsumer putHeader, Consumer sendBody); +} +``` + +Examples of usage: +* [Standard Java HttpServer](../../examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpServer.java) +* [Http Client with HttpURLConnection](../../examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpURLConnectionClient.java) +* [Http Servlet with Jetty](../../examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/JettyServer.java) diff --git a/http/basic/pom.xml b/http/basic/pom.xml new file mode 100644 index 00000000..67d9e8c0 --- /dev/null +++ b/http/basic/pom.xml @@ -0,0 +1,66 @@ + + + + 4.0.0 + + io.cloudevents + cloudevents-parent + 2.0.0-SNAPSHOT + ../../ + + + cloudevents-http-basic + CloudEvents - Basic Http Binding + jar + + + io.cloudevents.http.basic + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + diff --git a/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java b/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java new file mode 100644 index 00000000..d6d02f5e --- /dev/null +++ b/http/basic/src/main/java/io/cloudevents/http/HttpMessageFactory.java @@ -0,0 +1,152 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.http; + +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.message.impl.MessageUtils; +import io.cloudevents.core.message.impl.UnknownEncodingMessageReader; +import io.cloudevents.http.impl.CloudEventsHeaders; +import io.cloudevents.http.impl.HttpMessageReader; +import io.cloudevents.http.impl.HttpMessageWriter; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static io.cloudevents.http.impl.CloudEventsHeaders.CONTENT_TYPE; + +/** + * This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader} + * and {@link io.cloudevents.core.message.MessageWriter} for various HTTP APIs. + */ +public final class HttpMessageFactory { + + private HttpMessageFactory() {} + + /** + * Creates a new {@link MessageReader} that can read both structured and binary messages from a HTTP response (client) or request (server). + * + *

+     * Example of usage with HttpServletRequest:
+     * {@code
+     * Consumer> forEachHeader = processHeader -> {
+     *     Enumeration headerNames = httpServletRequest.getHeaderNames();
+     *     while (headerNames.hasMoreElements()) {
+     *         String name = headerNames.nextElement();
+     *         processHeader.accept(name, httpServletRequest.getHeader(name));
+     *
+     *     }
+     * };
+     * byte[] body = httpServletRequest.getInputStream().readAllBytes();
+     * HttpMessageFactory.createReader(forEachHeader, body);
+     * }
+     * 
+ * @param forEachHeader http headers visitor function + * @param body nullable buffer of the body + * @return a message reader implementation with potentially an unknown encoding + * @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid + */ + public static MessageReader createReader(Consumer> forEachHeader, byte[] body) { + final AtomicReference contentType = new AtomicReference<>(); + final AtomicReference specVersion = new AtomicReference<>(); + + forEachHeader.accept((k, v) -> { + if (CONTENT_TYPE.equalsIgnoreCase(k)) { + contentType.set(v); + } else if (CloudEventsHeaders.SPEC_VERSION.equalsIgnoreCase(k)) { + specVersion.set(v); + } + }); + + return MessageUtils.parseStructuredOrBinaryMessage( + contentType::get, + format -> new GenericStructuredMessageReader(format, body), + specVersion::get, + sv -> new HttpMessageReader(sv, forEachHeader, body), + UnknownEncodingMessageReader::new + ); + } + + /** + * Creates a new {@link MessageReader} that can read both structured and binary messages from a HTTP response (client) or request (server). + * + *
+     * This overload is equivalent to calling:
+     * {@code HttpMessageFactory.createReader(headers::forEach, body);}
+     * 
+ * @param headers http headers as map + * @param body nullable buffer of the body + * @return a message reader implementation with potentially an unknown encoding + * @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid + */ + public static MessageReader createReader(Map headers, byte[] body) { + return createReader(headers::forEach, body); + } + + /** + * Creates a new {@link MessageReader} that can read both structured and binary messages from a HTTP response (client) or request (server). + * + * @param headers http headers as multimap + * @param body nullable buffer of the body + * @return a message reader implementation with potentially an unknown encoding + * @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid + */ + public static MessageReader createReaderFromMultimap(Map> headers, byte[] body) { + Consumer> forEachHeader = processHeader -> + headers.forEach((key, values) -> + values.forEach(value -> processHeader.accept(key, value))); + return createReader(forEachHeader, body); + } + + /** + * Creates a new {@link MessageWriter} that can write both structured and binary messages to a HTTP response (server) or request (client). + * + *
+     * Example of usage with HttpServletResponse:
+     * {@code
+     * HttpMessageFactory.createWriter(
+     *     httpServletResponse::addHeader,
+     *     body -> {
+     *         try {
+     *             if (body != null) {
+     *                 httpServletResponse.setContentLength(body.length);
+     *                 httpServletResponse.setStatus(HttpStatus.OK_200);
+     *                 try (ServletOutputStream outputStream = httpServletResponse.getOutputStream()) {
+     *                     outputStream.write(body);
+     *                 }
+     *             } else {
+     *                 httpServletResponse.setStatus(HttpStatus.NO_CONTENT_204);
+     *             }
+     *         } catch (IOException e) {
+     *             throw new UncheckedIOException(e);
+     *         }
+     * });
+     * }
+     * 
+ * @param putHeader a function that puts header into HTTP request or response. + * @param sendBody a function that sends body (e.g. sets HTTP status code, content-length and writes the bytes into output stream). + * @return a message writer + */ + public static MessageWriter createWriter(BiConsumer putHeader, Consumer sendBody) { + return new HttpMessageWriter(putHeader, sendBody); + } + +} diff --git a/http/basic/src/main/java/io/cloudevents/http/impl/CloudEventsHeaders.java b/http/basic/src/main/java/io/cloudevents/http/impl/CloudEventsHeaders.java new file mode 100644 index 00000000..96fa38ad --- /dev/null +++ b/http/basic/src/main/java/io/cloudevents/http/impl/CloudEventsHeaders.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.http.impl; + +import io.cloudevents.core.message.impl.MessageUtils; + +import java.util.Collections; +import java.util.Map; + +public final class CloudEventsHeaders { + + private CloudEventsHeaders() {} + + public static final String CONTENT_TYPE = "Content-Type"; + + public static final String CE_PREFIX = "ce-"; + + public static final Map ATTRIBUTES_TO_HEADERS = Collections.unmodifiableMap(MessageUtils.generateAttributesToHeadersMapping(v -> { + if (v.equals("datacontenttype")) { + return CONTENT_TYPE; + } + return CE_PREFIX + v; + })); + + public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion"); + +} diff --git a/http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageReader.java b/http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageReader.java new file mode 100644 index 00000000..bef3fd98 --- /dev/null +++ b/http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageReader.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.http.impl; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static io.cloudevents.http.impl.CloudEventsHeaders.CE_PREFIX; +import static io.cloudevents.http.impl.CloudEventsHeaders.CONTENT_TYPE; + +public class HttpMessageReader extends BaseGenericBinaryMessageReaderImpl { + + private final Consumer> forEachHeader; + + public HttpMessageReader(SpecVersion version, Consumer> forEachHeader, byte[] body) { + super(version, body); + this.forEachHeader = forEachHeader; + } + + @Override + protected boolean isContentTypeHeader(String key) { + return CONTENT_TYPE.equalsIgnoreCase(key); + } + + @Override + protected boolean isCloudEventsHeader(String key) { + return key != null && key.length() > 3 && key.substring(0, CE_PREFIX.length()).toLowerCase().startsWith(CE_PREFIX); + } + + @Override + protected String toCloudEventsKey(String key) { + return key.substring(CE_PREFIX.length()).toLowerCase(); + } + + @Override + protected void forEachHeader(BiConsumer fn) { + forEachHeader.accept(fn); + } + + @Override + protected String toCloudEventsValue(String value) { + return value; + } +} diff --git a/http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageWriter.java b/http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageWriter.java new file mode 100644 index 00000000..84d7eeaf --- /dev/null +++ b/http/basic/src/main/java/io/cloudevents/http/impl/HttpMessageWriter.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.http.impl; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static io.cloudevents.http.impl.CloudEventsHeaders.CONTENT_TYPE; + +public class HttpMessageWriter implements CloudEventWriter, MessageWriter { + + private final BiConsumer putHeader; + private final Consumer putBody; + + public HttpMessageWriter(BiConsumer putHeader, Consumer putBody) { + this.putHeader = putHeader; + this.putBody = putBody; + } + + @Override + public Void setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + putHeader.accept(CONTENT_TYPE, format.serializedContentType()); + putBody.accept(value); + return null; + } + + @Override + public Void end(byte[] value) throws CloudEventRWException { + putBody.accept(value); + return null; + } + + @Override + public Void end() { + putBody.accept(null); + return null; + } + + @Override + public void setAttribute(String name, String value) throws CloudEventRWException { + putHeader.accept(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value); + } + + @Override + public void setExtension(String name, String value) throws CloudEventRWException { + putHeader.accept("ce-" + name, value); + } + + @Override + public HttpMessageWriter create(SpecVersion version) { + putHeader.accept(CloudEventsHeaders.SPEC_VERSION, version.toString()); + return this; + } +} diff --git a/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java b/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java new file mode 100644 index 00000000..edfa33d0 --- /dev/null +++ b/http/basic/src/test/java/io/cloudevents/http/HttpMessageReaderWriterTest.java @@ -0,0 +1,248 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.http; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.message.Encoding; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.http.HttpMessageFactory; +import io.cloudevents.http.impl.HttpMessageWriter; +import io.cloudevents.types.Time; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static io.cloudevents.core.test.Data.*; +import static org.assertj.core.api.Assertions.assertThat; + +public class HttpMessageReaderWriterTest { + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + void testWriteStructured(CloudEvent event) { + + final AtomicReference body = new AtomicReference<>(); + final Map headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + GenericStructuredMessageReader.from(event, CSVFormat.INSTANCE).visit( + HttpMessageFactory.createWriter(headers::put, body::set) + ); + assertThat(headers.get("content-type")) + .isEqualTo(CSVFormat.INSTANCE.serializedContentType()); + assertThat(body.get()) + .isEqualTo(CSVFormat.INSTANCE.serialize(event)); + + } + + @ParameterizedTest + @MethodSource("binaryTestArguments") + void testWriteBinary(CloudEvent event, Map expectedHeaders, byte[] expectedBody) { + + final AtomicReference body = new AtomicReference<>(); + final Map headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + + new HttpMessageWriter(headers::put, body::set).writeBinary(event); + + expectedHeaders.forEach((k, v) -> { + assertThat(headers.get(k)).isEqualTo(v); + }); + if (expectedBody != null) { + assertThat(body.get()).isEqualTo(expectedBody); + } + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void testReadStructured(CloudEvent event) { + byte[] serializedEvent = CSVFormat.INSTANCE.serialize(event); + + Map headers = new HashMap() {{ + put("content-type", CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"); + }}; + + MessageReader message = HttpMessageFactory.createReader( + headers, + serializedEvent + ); + + assertThat(message.getEncoding()) + .isEqualTo(Encoding.STRUCTURED); + assertThat(message.toEvent()) + .isEqualTo(event); + } + + @ParameterizedTest + @MethodSource("binaryTestArguments") + public void testReadBinary(CloudEvent expectedEvent, Map headers, byte[] body) { + MessageReader message = HttpMessageFactory.createReader(headers::forEach, body); + + assertThat(message.getEncoding()) + .isEqualTo(Encoding.BINARY); + assertThat(message.toEvent()) + .isEqualTo(expectedEvent); + } + + public static Stream binaryTestArguments() { + return Stream.of( + // V03 + Arguments.of( + V03_MIN, + createHeaders() + .add("ce-specversion", SpecVersion.V03.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()), + null + ), + Arguments.of( + V03_WITH_JSON_DATA, + createHeaders() + .add("ce-specversion", SpecVersion.V03.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("ce-schemaurl", DATASCHEMA.toString()) + .add("content-type", DATACONTENTTYPE_JSON) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)), + DATA_JSON_SERIALIZED + ), + Arguments.of( + V03_WITH_JSON_DATA_WITH_EXT_STRING, + createHeaders() + .add("ce-specversion", SpecVersion.V03.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("ce-schemaurl", DATASCHEMA.toString()) + .add("content-type", DATACONTENTTYPE_JSON) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)) + .add("ce-astring", "aaa") + .add("ce-aboolean", "true") + .add("ce-anumber", "10"), + DATA_JSON_SERIALIZED + ), + Arguments.of( + V03_WITH_XML_DATA, + createHeaders() + .add("ce-specversion", SpecVersion.V03.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("content-type", DATACONTENTTYPE_XML) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)), + DATA_XML_SERIALIZED + ), + Arguments.of( + V03_WITH_TEXT_DATA, + createHeaders() + .add("ce-specversion", SpecVersion.V03.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("content-type", DATACONTENTTYPE_TEXT) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)), + DATA_TEXT_SERIALIZED + ), + // V1 + Arguments.of( + V1_MIN, + createHeaders() + .add("ce-specversion", SpecVersion.V1.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()), + null + ), + Arguments.of( + V1_WITH_JSON_DATA, + createHeaders() + .add("ce-specversion", SpecVersion.V1.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("ce-dataschema", DATASCHEMA.toString()) + .add("content-type", DATACONTENTTYPE_JSON) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)), + DATA_JSON_SERIALIZED + ), + Arguments.of( + V1_WITH_JSON_DATA_WITH_EXT_STRING, + createHeaders() + .add("ce-specversion", SpecVersion.V1.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("ce-dataschema", DATASCHEMA.toString()) + .add("content-type", DATACONTENTTYPE_JSON) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)) + .add("ce-astring", "aaa") + .add("ce-aboolean", "true") + .add("ce-anumber", "10"), + DATA_JSON_SERIALIZED + ), + Arguments.of( + V1_WITH_XML_DATA, + createHeaders() + .add("ce-specversion", SpecVersion.V1.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("content-type", DATACONTENTTYPE_XML) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)), + DATA_XML_SERIALIZED + ), + Arguments.of( + V1_WITH_TEXT_DATA, + createHeaders() + .add("ce-specversion", SpecVersion.V1.toString()) + .add("ce-id", ID) + .add("ce-type", TYPE) + .add("ce-source", SOURCE.toString()) + .add("content-type", DATACONTENTTYPE_TEXT) + .add("ce-subject", SUBJECT) + .add("ce-time", Time.writeTime(TIME)), + DATA_TEXT_SERIALIZED + ) + ); + } + + private static Headers createHeaders() { + return new Headers(); + } + + private static class Headers extends HashMap { + public Headers add(String k, String v) { + this.put(k, v); + return this; + } + } +} diff --git a/pom.xml b/pom.xml index de18259d..55484ef2 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ api core formats/json-jackson + http/basic http/vertx http/restful-ws kafka