sendBody) {
+ return HttpMessageFactory.createWriter(headers::set, sendBody);
+ }
+
+ /**
+ * Helper method for extracting {@link HttpHeaders} from a {@link CloudEvent}. Can,
+ * for instance, be used in a @RequestMapping to return a
+ * {@link ResponseEntity} that has headers copied from a {@link CloudEvent}.
+ * @param event the input {@link CloudEvent}
+ * @return the response headers represented by the event
+ */
+ public static HttpHeaders toHttp(CloudEventContext event) {
+ HttpHeaders headers = new HttpHeaders();
+ CloudEventUtils.toReader(CloudEventBuilder.fromContext(event).build()).read(toWriter(headers, bytes -> {
+ }));
+ return headers;
+ }
+
+ /**
+ * Helper method for converting {@link HttpHeaders} to a {@link CloudEvent}. The input
+ * headers must represent a valid event in "binary" form, i.e. it must have headers
+ * "ce-id", "ce-specversion" etc.
+ * @param headers the input request headers
+ * @return a {@link CloudEventBuilder} that can be used to create a new
+ * {@link CloudEvent}
+ *
+ */
+ public static CloudEventBuilder fromHttp(HttpHeaders headers) {
+ return CloudEventBuilder
+ .fromContext(CloudEventUtils.toEvent(CloudEventHttpUtils.toReader(headers, () -> null)));
+ }
+
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/http/CloudEventsHeaders.java b/spring/src/main/java/io/cloudevents/spring/http/CloudEventsHeaders.java
new file mode 100644
index 00000000..40644dda
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/http/CloudEventsHeaders.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.spring.http;
+
+import java.util.Map;
+
+import io.cloudevents.core.message.impl.MessageUtils;
+
+import org.springframework.http.HttpHeaders;
+
+public class CloudEventsHeaders {
+
+ public static final String CE_PREFIX = "ce-";
+
+ public static final Map ATTRIBUTES_TO_HEADERS = MessageUtils
+ .generateAttributesToHeadersMapping(v -> {
+ if (v.equals("datacontenttype")) {
+ return HttpHeaders.CONTENT_TYPE;
+ }
+ return CE_PREFIX + v;
+ });
+
+ public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
+
+}
\ No newline at end of file
diff --git a/spring/src/main/java/io/cloudevents/spring/http/package-info.java b/spring/src/main/java/io/cloudevents/spring/http/package-info.java
new file mode 100644
index 00000000..0c3b2352
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/http/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Provides classes related to working with Cloud Events within the context of Spring and
+ * HTTP.
+ */
+package io.cloudevents.spring.http;
\ No newline at end of file
diff --git a/spring/src/main/java/io/cloudevents/spring/mvc/CloudEventHttpMessageConverter.java b/spring/src/main/java/io/cloudevents/spring/mvc/CloudEventHttpMessageConverter.java
new file mode 100644
index 00000000..fcfe5794
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/mvc/CloudEventHttpMessageConverter.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.mvc;
+
+import java.io.IOException;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.spring.http.CloudEventHttpUtils;
+
+import org.springframework.http.HttpInputMessage;
+import org.springframework.http.HttpOutputMessage;
+import org.springframework.http.MediaType;
+import org.springframework.http.converter.AbstractHttpMessageConverter;
+import org.springframework.http.converter.HttpMessageConverter;
+import org.springframework.http.converter.HttpMessageNotReadableException;
+import org.springframework.http.converter.HttpMessageNotWritableException;
+import org.springframework.util.StreamUtils;
+
+/**
+ * An {@link HttpMessageConverter} for {@link CloudEvent CloudEvents}. Supports the use of
+ * {@link CloudEvent} in a @RequestMapping as either a method parameter
+ * or a return value.
+ *
+ * @author Dave Syer
+ *
+ */
+public class CloudEventHttpMessageConverter extends AbstractHttpMessageConverter {
+
+ public CloudEventHttpMessageConverter() {
+ super(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL);
+ }
+
+ @Override
+ protected boolean supports(Class> clazz) {
+ return CloudEvent.class.isAssignableFrom(clazz);
+ }
+
+ @Override
+ protected CloudEvent readInternal(Class extends CloudEvent> clazz, HttpInputMessage inputMessage)
+ throws IOException, HttpMessageNotReadableException {
+ byte[] body = StreamUtils.copyToByteArray(inputMessage.getBody());
+ return CloudEventHttpUtils.toReader(inputMessage.getHeaders(), () -> body).toEvent();
+ }
+
+ @Override
+ protected void writeInternal(CloudEvent event, HttpOutputMessage outputMessage)
+ throws IOException, HttpMessageNotWritableException {
+ CloudEventUtils.toReader(event)
+ .read(CloudEventHttpUtils.toWriter(outputMessage.getHeaders(), body -> copy(body, outputMessage)));
+ }
+
+ private void copy(byte[] body, HttpOutputMessage outputMessage) {
+ try {
+ StreamUtils.copy(body, outputMessage.getBody());
+ }
+ catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/mvc/package-info.java b/spring/src/main/java/io/cloudevents/spring/mvc/package-info.java
new file mode 100644
index 00000000..ce3ccd50
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/mvc/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes related to working with Cloud Events within the context of Spring MVC.
+ */
+package io.cloudevents.spring.mvc;
\ No newline at end of file
diff --git a/spring/src/main/java/io/cloudevents/spring/webflux/CloudEventHttpMessageReader.java b/spring/src/main/java/io/cloudevents/spring/webflux/CloudEventHttpMessageReader.java
new file mode 100644
index 00000000..b2a6efd9
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/webflux/CloudEventHttpMessageReader.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.webflux;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.spring.http.CloudEventHttpUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ReactiveHttpInputMessage;
+import org.springframework.http.codec.HttpMessageReader;
+import org.springframework.util.StreamUtils;
+
+/**
+ * A reactive {@link HttpMessageReader} for {@link CloudEvent CloudEvents}, converting
+ * from an HTTP request to a cloud event. Supports the use of {@link CloudEvent} as an
+ * input to a reactive endpoint.
+ *
+ * @author Dave Syer
+ *
+ */
+public class CloudEventHttpMessageReader implements HttpMessageReader {
+
+ @Override
+ public List getReadableMediaTypes() {
+ return Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL);
+ }
+
+ @Override
+ public boolean canRead(ResolvableType elementType, MediaType mediaType) {
+ return CloudEvent.class.isAssignableFrom(elementType.toClass());
+ }
+
+ @Override
+ public Flux read(ResolvableType elementType, ReactiveHttpInputMessage message,
+ Map hints) {
+ return Flux.from(readMono(elementType, message, hints));
+ }
+
+ @Override
+ public Mono readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
+ Map hints) {
+ HttpHeaders headers = message.getHeaders();
+ Mono body = DataBufferUtils.join(message.getBody()).map(buffer -> {
+ try {
+ return StreamUtils.copyToByteArray(buffer.asInputStream(true));
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ });
+ return body.map(bytes -> CloudEventHttpUtils.toReader(headers, () -> bytes)).map(MessageReader::toEvent);
+ }
+
+}
\ No newline at end of file
diff --git a/spring/src/main/java/io/cloudevents/spring/webflux/CloudEventHttpMessageWriter.java b/spring/src/main/java/io/cloudevents/spring/webflux/CloudEventHttpMessageWriter.java
new file mode 100644
index 00000000..27eeaad4
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/webflux/CloudEventHttpMessageWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.webflux;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import io.cloudevents.spring.http.CloudEventsHeaders;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ReactiveHttpOutputMessage;
+import org.springframework.http.codec.HttpMessageWriter;
+
+/**
+ * A reactive {@link HttpMessageWriter} for {@link CloudEvent CloudEvents}, converting
+ * from a cloud event to an HTTP response. Supports the use of {@link CloudEvent} as an
+ * output from a reactive endpoint.
+ *
+ * @author Dave Syer
+ *
+ */
+public class CloudEventHttpMessageWriter implements HttpMessageWriter {
+
+ @Override
+ public List getWritableMediaTypes() {
+ return Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL);
+ }
+
+ @Override
+ public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
+ return CloudEvent.class.isAssignableFrom(elementType.toClass());
+ }
+
+ @Override
+ public Mono write(Publisher extends CloudEvent> inputStream, ResolvableType elementType,
+ MediaType mediaType, ReactiveHttpOutputMessage message, Map hints) {
+ return Mono.from(inputStream).map(CloudEventUtils::toReader)
+ .flatMap(reader -> reader.read(new ReactiveHttpMessageWriter(message)));
+ }
+
+ private static class ReactiveHttpMessageWriter
+ implements MessageWriter>, Mono>, CloudEventWriter> {
+
+ private final ReactiveHttpOutputMessage response;
+
+ public ReactiveHttpMessageWriter(ReactiveHttpOutputMessage response) {
+ this.response = response;
+ }
+
+ // Binary visitor factory
+
+ @Override
+ public CloudEventWriter> create(SpecVersion version) {
+ this.response.getHeaders().set(CloudEventsHeaders.SPEC_VERSION, version.toString());
+ return this;
+ }
+
+ // Binary visitor
+
+ @Override
+ public ReactiveHttpMessageWriter withAttribute(String name, String value) throws CloudEventRWException {
+ this.response.getHeaders().set(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
+ return this;
+ }
+
+ @Override
+ public ReactiveHttpMessageWriter withExtension(String name, String value) throws CloudEventRWException {
+ this.response.getHeaders().set("ce-" + name, value);
+ return this;
+ }
+
+ @Override
+ public Mono end(CloudEventData value) throws CloudEventRWException {
+ return copy(value.toBytes(), this.response);
+ }
+
+ @Override
+ public Mono end() {
+ return copy(new byte[0], this.response);
+ }
+
+ // Structured visitor
+
+ @Override
+ public Mono setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
+ this.response.getHeaders().set(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
+ return copy(value, this.response);
+ }
+
+ private Mono copy(byte[] bytes, ReactiveHttpOutputMessage message) {
+ DataBuffer data = message.bufferFactory().wrap(bytes);
+ message.getHeaders().setContentLength(bytes.length);
+ return message.writeWith(Mono.just(data));
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/spring/src/main/java/io/cloudevents/spring/webflux/package-info.java b/spring/src/main/java/io/cloudevents/spring/webflux/package-info.java
new file mode 100644
index 00000000..9f8e18d5
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/webflux/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Provides classes related to working with Cloud Events within the context of Spring
+ * WebFlux.
+ */
+package io.cloudevents.spring.webflux;
\ No newline at end of file
diff --git a/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java b/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java
new file mode 100644
index 00000000..53477b5c
--- /dev/null
+++ b/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.mvc;
+
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.spring.http.CloudEventHttpUtils;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.RequestEntity;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.converter.HttpMessageConverter;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Dave Syer
+ *
+ */
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
+class MvcRestControllerTests {
+
+ @Autowired
+ private TestRestTemplate rest;
+
+ @LocalServerPort
+ private int port;
+
+ @Test
+ void echoWithCorrectHeaders() {
+
+ ResponseEntity response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) //
+ .header("ce-id", "12345") //
+ .header("ce-specversion", "1.0") //
+ .header("ce-type", "io.spring.event") //
+ .header("ce-source", "https://spring.io/events") //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body("{\"value\":\"Dave\"}"), String.class);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
+
+ HttpHeaders headers = response.getHeaders();
+
+ assertThat(headers).containsKey("ce-id");
+ assertThat(headers).containsKey("ce-source");
+ assertThat(headers).containsKey("ce-type");
+
+ // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @Test
+ void structuredRequestResponseEvents() {
+
+ ResponseEntity response = rest
+ .exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
+ .contentType(new MediaType("application", "cloudevents+json")) //
+ .body("{" //
+ + "\"id\":\"12345\"," //
+ + "\"specversion\":\"1.0\"," //
+ + "\"type\":\"io.spring.event\"," //
+ + "\"source\":\"https://spring.io/events\"," //
+ + "\"data\":{\"value\":\"Dave\"}}"),
+ String.class);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
+
+ HttpHeaders headers = response.getHeaders();
+
+ assertThat(headers).containsKey("ce-id");
+ assertThat(headers).containsKey("ce-source");
+ assertThat(headers).containsKey("ce-type");
+
+ // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @Test
+ void requestResponseEvents() {
+
+ ResponseEntity response = rest
+ .exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
+ .header("ce-id", "12345") //
+ .header("ce-specversion", "1.0") //
+ .header("ce-type", "io.spring.event") //
+ .header("ce-source", "https://spring.io/events") //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body("{\"value\":\"Dave\"}"), String.class);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
+
+ HttpHeaders headers = response.getHeaders();
+
+ assertThat(headers).containsKey("ce-id");
+ assertThat(headers).containsKey("ce-source");
+ assertThat(headers).containsKey("ce-type");
+
+ assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @SpringBootApplication
+ @RestController
+ static class TestApplication {
+
+ @PostMapping("/")
+ public ResponseEntity echo(@RequestBody Foo foo, @RequestHeader HttpHeaders headers) {
+ CloudEvent attributes = CloudEventHttpUtils.fromHttp(headers) //
+ .withId(UUID.randomUUID().toString()) //
+ .withSource(URI.create("https://spring.io/foos")) //
+ .withType("io.spring.event.Foo") //
+ .build();
+ HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes);
+ return ResponseEntity.ok().headers(outgoing).body(foo);
+ }
+
+ @PostMapping("/event")
+ public CloudEvent ce(@RequestBody CloudEvent event) {
+ CloudEvent attributes = CloudEventBuilder.from(event) //
+ .withId(UUID.randomUUID().toString()) //
+ .withSource(URI.create("https://spring.io/foos")) //
+ .withType("io.spring.event.Foo") //
+ .withData(event.getData().toBytes()) //
+ .build();
+ return attributes;
+ }
+
+ @Configuration
+ public static class CloudEventHandlerConfiguration implements WebMvcConfigurer {
+
+ @Override
+ public void configureMessageConverters(List> converters) {
+ converters.add(0, new CloudEventHttpMessageConverter());
+ }
+
+ }
+
+ }
+
+}
+
+class Foo {
+
+ private String value;
+
+ public Foo() {
+ }
+
+ public Foo(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Foo [value=" + this.value + "]";
+ }
+
+}
diff --git a/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java b/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java
new file mode 100644
index 00000000..05815a46
--- /dev/null
+++ b/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.webflux;
+
+import java.net.URI;
+import java.util.UUID;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.spring.http.CloudEventHttpUtils;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.web.codec.CodecCustomizer;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.RequestEntity;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.codec.CodecConfigurer;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RestController;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Dave Syer
+ *
+ */
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=REACTIVE")
+class WebFluxRestControllerTests {
+
+ @Autowired
+ private TestRestTemplate rest;
+
+ @LocalServerPort
+ private int port;
+
+ @Test
+ void echoWithCorrectHeaders() {
+
+ ResponseEntity response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) //
+ .header("ce-id", "12345") //
+ .header("ce-specversion", "1.0") //
+ .header("ce-type", "io.spring.event") //
+ .header("ce-source", "https://spring.io/events") //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body("{\"value\":\"Dave\"}"), String.class);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
+
+ HttpHeaders headers = response.getHeaders();
+
+ assertThat(headers).containsKey("ce-id");
+ assertThat(headers).containsKey("ce-source");
+ assertThat(headers).containsKey("ce-type");
+
+ // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @Test
+ void structuredRequestResponseEvents() {
+
+ ResponseEntity response = rest
+ .exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
+ .contentType(new MediaType("application", "cloudevents+json")) //
+ .body("{" //
+ + "\"id\":\"12345\"," //
+ + "\"specversion\":\"1.0\"," //
+ + "\"type\":\"io.spring.event\"," //
+ + "\"source\":\"https://spring.io/events\"," //
+ + "\"data\":{\"value\":\"Dave\"}}"),
+ String.class);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
+
+ HttpHeaders headers = response.getHeaders();
+
+ assertThat(headers).containsKey("ce-id");
+ assertThat(headers).containsKey("ce-source");
+ assertThat(headers).containsKey("ce-type");
+
+ // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @Test
+ void requestResponseEvents() {
+
+ ResponseEntity response = rest
+ .exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
+ .header("ce-id", "12345") //
+ .header("ce-specversion", "1.0") //
+ .header("ce-type", "io.spring.event") //
+ .header("ce-source", "https://spring.io/events") //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body("{\"value\":\"Dave\"}"), String.class);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
+
+ HttpHeaders headers = response.getHeaders();
+
+ assertThat(headers).containsKey("ce-id");
+ assertThat(headers).containsKey("ce-source");
+ assertThat(headers).containsKey("ce-type");
+
+ // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @SpringBootApplication
+ @RestController
+ static class TestApplication {
+
+ @PostMapping("/")
+ public ResponseEntity echo(@RequestBody Foo foo, @RequestHeader HttpHeaders headers) {
+ CloudEvent attributes = CloudEventHttpUtils.fromHttp(headers).withId(UUID.randomUUID().toString())
+ .withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo").build();
+ HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes);
+ return ResponseEntity.ok().headers(outgoing).body(foo);
+ }
+
+ @PostMapping("/event")
+ public Mono event(@RequestBody Mono body) {
+ return body.map(event -> CloudEventBuilder.from(event).withId(UUID.randomUUID().toString())
+ .withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo")
+ .withData(event.getData().toBytes()).build());
+ }
+
+ @Configuration
+ public static class CloudEventHandlerConfiguration implements CodecCustomizer {
+
+ @Override
+ public void customize(CodecConfigurer configurer) {
+ configurer.customCodecs().register(new CloudEventHttpMessageReader());
+ configurer.customCodecs().register(new CloudEventHttpMessageWriter());
+ }
+
+ }
+
+ }
+
+}
+
+class Foo {
+
+ private String value;
+
+ public Foo() {
+ }
+
+ public Foo(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Foo [value=" + this.value + "]";
+ }
+
+}
\ No newline at end of file