Generic http binding (#225)

Signed-off-by: Matej Vasek <mvasek@redhat.com>
This commit is contained in:
Matej Vasek 2020-09-09 19:28:40 +02:00 committed by GitHub
parent 29c9eaa23f
commit 6eecf29cf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1056 additions and 0 deletions

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-basic-http-example</artifactId>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-basic</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.28.v20200408</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,90 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.examples.http.basic;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.http.HttpMessageFactory;
import java.io.*;
import java.net.InetSocketAddress;
public class BasicHttpServer {
public static void main(String[] args) throws IOException {
HttpServer httpServer = HttpServer.create(new InetSocketAddress("localhost", 8080), 0);
httpServer.createContext("/echo", BasicHttpServer::echoHandler);
httpServer.start();
}
private static void echoHandler(HttpExchange exchange) throws IOException {
if (!"POST".equalsIgnoreCase(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(405, 0);
return;
}
try {
MessageReader messageReader = createMessageReader(exchange);
CloudEvent cloudEvent = messageReader.toEvent();
System.out.println("Handling event: " + cloudEvent);
MessageWriter messageWriter = createMessageWriter(exchange);
messageWriter.writeBinary(cloudEvent);
} catch (Throwable t) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (PrintWriter pw = new PrintWriter(byteArrayOutputStream)) {
t.printStackTrace(pw);
}
byte[] body = byteArrayOutputStream.toByteArray();
exchange.sendResponseHeaders(500, body.length);
try (OutputStream outputStream = exchange.getResponseBody()) {
outputStream.write(body);
}
}
}
}
private static MessageReader createMessageReader(HttpExchange httpExchange) throws IOException {
Headers headers = httpExchange.getRequestHeaders();
byte[] body = IOUtils.toByteArray(httpExchange.getRequestBody());
return HttpMessageFactory.createReaderFromMultimap(headers, body);
}
private static MessageWriter createMessageWriter(HttpExchange httpExchange) {
return HttpMessageFactory.createWriter(
httpExchange.getResponseHeaders()::add,
body -> {
try {
try (OutputStream os = httpExchange.getResponseBody()){
if (body != null) {
httpExchange.sendResponseHeaders(200, body.length);
os.write(body);
} else {
httpExchange.sendResponseHeaders(204, -1);
}
}
} catch (IOException t) {
throw new UncheckedIOException(t);
}
}
);
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.examples.http.basic;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.http.HttpMessageFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class HttpURLConnectionClient {
public static void main(String[] args) throws IOException {
CloudEvent ceToSend = new CloudEventBuilder()
.withId("my-id")
.withSource(URI.create("/myClient"))
.withType("dev.knative.cronjob.event")
.withDataContentType("application/json")
.withData("{ \"msg\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
.build();
URL url = new URL("http://localhost:8080/echo");
HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
httpUrlConnection.setRequestMethod("POST");
httpUrlConnection.setDoOutput(true);
httpUrlConnection.setDoInput(true);
MessageWriter messageWriter = createMessageWriter(httpUrlConnection);
messageWriter.writeBinary(ceToSend);
MessageReader messageReader = createMessageReader(httpUrlConnection);
CloudEvent receivedCE = messageReader.toEvent();
System.out.println("CloudEvent: " + receivedCE);
System.out.println("Data: " + new String(receivedCE.getData(), StandardCharsets.UTF_8));
}
private static MessageReader createMessageReader(HttpURLConnection httpUrlConnection) throws IOException {
Map<String, List<String>> headers = httpUrlConnection.getHeaderFields();
byte[] body = IOUtils.toByteArray(httpUrlConnection.getInputStream());
return HttpMessageFactory.createReaderFromMultimap(headers, body);
}
private static MessageWriter createMessageWriter(HttpURLConnection httpUrlConnection) {
return HttpMessageFactory.createWriter(
httpUrlConnection::setRequestProperty,
body -> {
try {
if (body != null) {
httpUrlConnection.setRequestProperty("content-length", String.valueOf(body.length));
try (OutputStream outputStream = httpUrlConnection.getOutputStream()) {
outputStream.write(body);
}
} else {
httpUrlConnection.setRequestProperty("content-length", "0");
}
} catch (IOException t) {
throw new UncheckedIOException(t);
}
});
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.examples.http.basic;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
// based on apache's commons-io
// which is licensed under the Apache License, Version 2.0
// https://commons.apache.org/proper/commons-io/
// https://github.com/apache/commons-io/blob/master/src/main/java/org/apache/commons/io/IOUtils.java
public final class IOUtils {
private IOUtils() {}
// since Java 9+ you may call InputStream.readAllBytes() instead of this method
public static byte[] toByteArray(InputStream body) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
byte[] buff = new byte[(1<<10) * 8];
int read;
while ((read = body.read(buff)) != -1) {
byteArrayOutputStream.write(buff, 0, read);
}
return byteArrayOutputStream.toByteArray();
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.examples.http.basic;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.Enumeration;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.http.HttpMessageFactory;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class JettyServer {
private static class Handler extends AbstractHandler {
@Override
public void handle(String uri,
Request request,
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws IOException, ServletException {
if (!"/echo".equalsIgnoreCase(uri)) {
httpServletResponse.setStatus(HttpStatus.NOT_FOUND_404);
return;
}
if (!"POST".equalsIgnoreCase(request.getMethod())) {
httpServletResponse.setStatus(HttpStatus.METHOD_NOT_ALLOWED_405);
return;
}
CloudEvent receivedEvent = createMessageReader(httpServletRequest).toEvent();
System.out.println("Handling event: " + receivedEvent);
createMessageWriter(httpServletResponse).writeBinary(receivedEvent);
}
}
private static MessageReader createMessageReader(HttpServletRequest httpServletRequest) throws IOException {
Consumer<BiConsumer<String, String>> forEachHeader = processHeader -> {
Enumeration<String> headerNames = httpServletRequest.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
processHeader.accept(name, httpServletRequest.getHeader(name));
}
};
byte[] body = IOUtils.toByteArray(httpServletRequest.getInputStream());
return HttpMessageFactory.createReader(forEachHeader, body);
}
private static MessageWriter createMessageWriter(HttpServletResponse httpServletResponse) throws IOException {
return HttpMessageFactory.createWriter(
httpServletResponse::addHeader,
body -> {
try {
try (ServletOutputStream outputStream = httpServletResponse.getOutputStream()) {
if (body != null) {
httpServletResponse.setContentLength(body.length);
httpServletResponse.setStatus(HttpStatus.OK_200);
outputStream.write(body);
} else {
httpServletResponse.setStatus(HttpStatus.NO_CONTENT_204);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
public static void main(String[] args) throws Exception {
Server server = new Server(new InetSocketAddress("localhost", 8080));
server.setHandler(new Handler());
server.start();
server.join();
}
}

View File

@ -17,6 +17,7 @@
<module>kafka</module>
<module>restful-ws-quarkus</module>
<module>vertx</module>
<module>basic-http</module>
</modules>

42
http/basic/README.md Normal file
View File

@ -0,0 +1,42 @@
# Generic HTTP Protocol Binding
[![Javadocs](http://www.javadoc.io/badge/io.cloudevents/cloudevents-http-basic.svg?color=green)](http://www.javadoc.io/doc/io.cloudevents/cloudevents-http-basic)
This module is designed to be usable with various HTTP APIs.
There are also more specialized HTTP bindings:
* [`cloudevents-http-vertx`](../vertx)
* [`cloudevents-http-restful-ws`](../restful-ws)
Since this module is generic it doesn't offer optimal performance for all HTTP implementations.
For better performance consider implementing `MessageReader` and `MessageWriter` that are
tailored for specific HTTP implementation. As a reference you can take aforementioned existing bindings.
For Maven based projects, use the following to configure the CloudEvents Generic HTTP Transport:
```xml
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-basic</artifactId>
<version>2.0.0-milestone1</version>
</dependency>
```
## Sending and Receiving CloudEvents
To send and receive CloudEvents we use `MessageWriter` and `MessageReader`, respectively.
This module offers factory methods for creation of those in `HttpMessageFactory`.
```java
public class HttpMessageFactory {
public static MessageReader createReader(Consumer<BiConsumer<String,String>> forEachHeader, byte[] body);
public static MessageReader createReader(Map<String,String> headers, byte[] body);
public static MessageReader createReaderFromMultimap(Map<String,List<String>> headers, byte[] body);
public static MessageWriter createWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> sendBody);
}
```
Examples of usage:
* [Standard Java HttpServer](../../examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpServer.java)
* [Http Client with HttpURLConnection](../../examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/HttpURLConnectionClient.java)
* [Http Servlet with Jetty](../../examples/basic-http/src/main/java/io/cloudevents/examples/http/basic/JettyServer.java)

66
http/basic/pom.xml Normal file
View File

@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>cloudevents-http-basic</artifactId>
<name>CloudEvents - Basic Http Binding</name>
<packaging>jar</packaging>
<properties>
<module-name>io.cloudevents.http.basic</module-name>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Test deps -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,152 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
import io.cloudevents.http.impl.CloudEventsHeaders;
import io.cloudevents.http.impl.HttpMessageReader;
import io.cloudevents.http.impl.HttpMessageWriter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static io.cloudevents.http.impl.CloudEventsHeaders.CONTENT_TYPE;
/**
* This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader}
* and {@link io.cloudevents.core.message.MessageWriter} for various HTTP APIs.
*/
public final class HttpMessageFactory {
private HttpMessageFactory() {}
/**
* Creates a new {@link MessageReader} that can read both structured and binary messages from a HTTP response (client) or request (server).
*
* <pre>
* Example of usage with <a href="https://docs.oracle.com/javaee/6/api/javax/servlet/http/HttpServletRequest.html">HttpServletRequest</a>:
* {@code
* Consumer<BiConsumer<String,String>> forEachHeader = processHeader -> {
* Enumeration<String> headerNames = httpServletRequest.getHeaderNames();
* while (headerNames.hasMoreElements()) {
* String name = headerNames.nextElement();
* processHeader.accept(name, httpServletRequest.getHeader(name));
*
* }
* };
* byte[] body = httpServletRequest.getInputStream().readAllBytes();
* HttpMessageFactory.createReader(forEachHeader, body);
* }
* </pre>
* @param forEachHeader http headers visitor function
* @param body nullable buffer of the body
* @return a message reader implementation with potentially an unknown encoding
* @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid
*/
public static MessageReader createReader(Consumer<BiConsumer<String, String>> forEachHeader, byte[] body) {
final AtomicReference<String> contentType = new AtomicReference<>();
final AtomicReference<String> specVersion = new AtomicReference<>();
forEachHeader.accept((k, v) -> {
if (CONTENT_TYPE.equalsIgnoreCase(k)) {
contentType.set(v);
} else if (CloudEventsHeaders.SPEC_VERSION.equalsIgnoreCase(k)) {
specVersion.set(v);
}
});
return MessageUtils.parseStructuredOrBinaryMessage(
contentType::get,
format -> new GenericStructuredMessageReader(format, body),
specVersion::get,
sv -> new HttpMessageReader(sv, forEachHeader, body),
UnknownEncodingMessageReader::new
);
}
/**
* Creates a new {@link MessageReader} that can read both structured and binary messages from a HTTP response (client) or request (server).
*
* <pre>
* This overload is equivalent to calling:
* {@code HttpMessageFactory.createReader(headers::forEach, body);}
* </pre>
* @param headers http headers as map
* @param body nullable buffer of the body
* @return a message reader implementation with potentially an unknown encoding
* @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid
*/
public static MessageReader createReader(Map<String, String> headers, byte[] body) {
return createReader(headers::forEach, body);
}
/**
* Creates a new {@link MessageReader} that can read both structured and binary messages from a HTTP response (client) or request (server).
*
* @param headers http headers as multimap
* @param body nullable buffer of the body
* @return a message reader implementation with potentially an unknown encoding
* @throws IllegalArgumentException If, in case of binary mode, the spec version is invalid
*/
public static MessageReader createReaderFromMultimap(Map<String, List<String>> headers, byte[] body) {
Consumer<BiConsumer<String, String>> forEachHeader = processHeader ->
headers.forEach((key, values) ->
values.forEach(value -> processHeader.accept(key, value)));
return createReader(forEachHeader, body);
}
/**
* Creates a new {@link MessageWriter} that can write both structured and binary messages to a HTTP response (server) or request (client).
*
* <pre>
* Example of usage with <a href="https://docs.oracle.com/javaee/6/api/javax/servlet/http/HttpServletResponse.html">HttpServletResponse</a>:
* {@code
* HttpMessageFactory.createWriter(
* httpServletResponse::addHeader,
* body -> {
* try {
* if (body != null) {
* httpServletResponse.setContentLength(body.length);
* httpServletResponse.setStatus(HttpStatus.OK_200);
* try (ServletOutputStream outputStream = httpServletResponse.getOutputStream()) {
* outputStream.write(body);
* }
* } else {
* httpServletResponse.setStatus(HttpStatus.NO_CONTENT_204);
* }
* } catch (IOException e) {
* throw new UncheckedIOException(e);
* }
* });
* }
* </pre>
* @param putHeader a function that puts header into HTTP request or response.
* @param sendBody a function that sends body (e.g. sets HTTP status code, content-length and writes the bytes into output stream).
* @return a message writer
*/
public static MessageWriter createWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> sendBody) {
return new HttpMessageWriter(putHeader, sendBody);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.impl;
import io.cloudevents.core.message.impl.MessageUtils;
import java.util.Collections;
import java.util.Map;
public final class CloudEventsHeaders {
private CloudEventsHeaders() {}
public static final String CONTENT_TYPE = "Content-Type";
public static final String CE_PREFIX = "ce-";
public static final Map<String, String> ATTRIBUTES_TO_HEADERS = Collections.unmodifiableMap(MessageUtils.generateAttributesToHeadersMapping(v -> {
if (v.equals("datacontenttype")) {
return CONTENT_TYPE;
}
return CE_PREFIX + v;
}));
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static io.cloudevents.http.impl.CloudEventsHeaders.CE_PREFIX;
import static io.cloudevents.http.impl.CloudEventsHeaders.CONTENT_TYPE;
public class HttpMessageReader extends BaseGenericBinaryMessageReaderImpl<String, String> {
private final Consumer<BiConsumer<String,String>> forEachHeader;
public HttpMessageReader(SpecVersion version, Consumer<BiConsumer<String,String>> forEachHeader, byte[] body) {
super(version, body);
this.forEachHeader = forEachHeader;
}
@Override
protected boolean isContentTypeHeader(String key) {
return CONTENT_TYPE.equalsIgnoreCase(key);
}
@Override
protected boolean isCloudEventsHeader(String key) {
return key != null && key.length() > 3 && key.substring(0, CE_PREFIX.length()).toLowerCase().startsWith(CE_PREFIX);
}
@Override
protected String toCloudEventsKey(String key) {
return key.substring(CE_PREFIX.length()).toLowerCase();
}
@Override
protected void forEachHeader(BiConsumer<String, String> fn) {
forEachHeader.accept(fn);
}
@Override
protected String toCloudEventsValue(String value) {
return value;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http.impl;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static io.cloudevents.http.impl.CloudEventsHeaders.CONTENT_TYPE;
public class HttpMessageWriter implements CloudEventWriter<Void>, MessageWriter<HttpMessageWriter, Void> {
private final BiConsumer<String, String> putHeader;
private final Consumer<byte[]> putBody;
public HttpMessageWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> putBody) {
this.putHeader = putHeader;
this.putBody = putBody;
}
@Override
public Void setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
putHeader.accept(CONTENT_TYPE, format.serializedContentType());
putBody.accept(value);
return null;
}
@Override
public Void end(byte[] value) throws CloudEventRWException {
putBody.accept(value);
return null;
}
@Override
public Void end() {
putBody.accept(null);
return null;
}
@Override
public void setAttribute(String name, String value) throws CloudEventRWException {
putHeader.accept(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
}
@Override
public void setExtension(String name, String value) throws CloudEventRWException {
putHeader.accept("ce-" + name, value);
}
@Override
public HttpMessageWriter create(SpecVersion version) {
putHeader.accept(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
}

View File

@ -0,0 +1,248 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.http;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.http.HttpMessageFactory;
import io.cloudevents.http.impl.HttpMessageWriter;
import io.cloudevents.types.Time;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.cloudevents.core.test.Data.*;
import static org.assertj.core.api.Assertions.assertThat;
public class HttpMessageReaderWriterTest {
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
void testWriteStructured(CloudEvent event) {
final AtomicReference<byte[]> body = new AtomicReference<>();
final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
GenericStructuredMessageReader.from(event, CSVFormat.INSTANCE).visit(
HttpMessageFactory.createWriter(headers::put, body::set)
);
assertThat(headers.get("content-type"))
.isEqualTo(CSVFormat.INSTANCE.serializedContentType());
assertThat(body.get())
.isEqualTo(CSVFormat.INSTANCE.serialize(event));
}
@ParameterizedTest
@MethodSource("binaryTestArguments")
void testWriteBinary(CloudEvent event, Map<String,String> expectedHeaders, byte[] expectedBody) {
final AtomicReference<byte[]> body = new AtomicReference<>();
final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
new HttpMessageWriter(headers::put, body::set).writeBinary(event);
expectedHeaders.forEach((k, v) -> {
assertThat(headers.get(k)).isEqualTo(v);
});
if (expectedBody != null) {
assertThat(body.get()).isEqualTo(expectedBody);
}
}
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void testReadStructured(CloudEvent event) {
byte[] serializedEvent = CSVFormat.INSTANCE.serialize(event);
Map<String,String> headers = new HashMap<String,String>() {{
put("content-type", CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8");
}};
MessageReader message = HttpMessageFactory.createReader(
headers,
serializedEvent
);
assertThat(message.getEncoding())
.isEqualTo(Encoding.STRUCTURED);
assertThat(message.toEvent())
.isEqualTo(event);
}
@ParameterizedTest
@MethodSource("binaryTestArguments")
public void testReadBinary(CloudEvent expectedEvent, Map<String,String> headers, byte[] body) {
MessageReader message = HttpMessageFactory.createReader(headers::forEach, body);
assertThat(message.getEncoding())
.isEqualTo(Encoding.BINARY);
assertThat(message.toEvent())
.isEqualTo(expectedEvent);
}
public static Stream<Arguments> binaryTestArguments() {
return Stream.of(
// V03
Arguments.of(
V03_MIN,
createHeaders()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString()),
null
),
Arguments.of(
V03_WITH_JSON_DATA,
createHeaders()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME)),
DATA_JSON_SERIALIZED
),
Arguments.of(
V03_WITH_JSON_DATA_WITH_EXT_STRING,
createHeaders()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-schemaurl", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10"),
DATA_JSON_SERIALIZED
),
Arguments.of(
V03_WITH_XML_DATA,
createHeaders()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME)),
DATA_XML_SERIALIZED
),
Arguments.of(
V03_WITH_TEXT_DATA,
createHeaders()
.add("ce-specversion", SpecVersion.V03.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME)),
DATA_TEXT_SERIALIZED
),
// V1
Arguments.of(
V1_MIN,
createHeaders()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString()),
null
),
Arguments.of(
V1_WITH_JSON_DATA,
createHeaders()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME)),
DATA_JSON_SERIALIZED
),
Arguments.of(
V1_WITH_JSON_DATA_WITH_EXT_STRING,
createHeaders()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("ce-dataschema", DATASCHEMA.toString())
.add("content-type", DATACONTENTTYPE_JSON)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME))
.add("ce-astring", "aaa")
.add("ce-aboolean", "true")
.add("ce-anumber", "10"),
DATA_JSON_SERIALIZED
),
Arguments.of(
V1_WITH_XML_DATA,
createHeaders()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_XML)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME)),
DATA_XML_SERIALIZED
),
Arguments.of(
V1_WITH_TEXT_DATA,
createHeaders()
.add("ce-specversion", SpecVersion.V1.toString())
.add("ce-id", ID)
.add("ce-type", TYPE)
.add("ce-source", SOURCE.toString())
.add("content-type", DATACONTENTTYPE_TEXT)
.add("ce-subject", SUBJECT)
.add("ce-time", Time.writeTime(TIME)),
DATA_TEXT_SERIALIZED
)
);
}
private static Headers createHeaders() {
return new Headers();
}
private static class Headers extends HashMap<String, String> {
public Headers add(String k, String v) {
this.put(k, v);
return this;
}
}
}

View File

@ -69,6 +69,7 @@
<module>api</module>
<module>core</module>
<module>formats/json-jackson</module>
<module>http/basic</module>
<module>http/vertx</module>
<module>http/restful-ws</module>
<module>kafka</module>