HTTP converters for CloudEvent in Spring (#312)

Supports MVC and WebFlux (blocking and non-blocking) HTTP.
User can work with `CloudEvent` as a `POJO` type and inject it
into `@ReqestMapping` methods.

Signed-off-by: Dave Syer <dsyer@vmware.com>

Co-authored-by: Oleg Zhurakousky <ozhurakousky@pivotal.io>
This commit is contained in:
Dave Syer 2020-11-30 10:27:44 +00:00 committed by GitHub
parent bcc1434a39
commit 5099b31f6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 952 additions and 0 deletions

View File

@ -74,6 +74,7 @@
<module>http/vertx</module>
<module>http/restful-ws</module>
<module>kafka</module>
<module>spring</module>
</modules>
<properties>

12
spring/README.md Normal file
View File

@ -0,0 +1,12 @@
## Spring Support
### Introduction
This module provides classes and interfaces that can be used by [Spring frameworks](https://spring.io/) and integrations to assist with Cloud Event processing.
Given that Spring defines [Message](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/messaging/Message.html) abstraction,
which perfectly maps to the structure defined by Cloud Events specification, one may say Cloud Events are already supported by any Spring framework that
relies on `Message`. So this modules provides several utilities and strategies to simplify working with Cloud Events in the context of Spring
frameworks and integrations (see individual component's javadocs for more details).
Please see individual samples in `examples/spring` directory of this SDK for more details.

105
spring/pom.xml Normal file
View File

@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>cloudevents-spring</artifactId>
<name>CloudEvents - support for Spring</name>
<packaging>jar</packaging>
<properties>
<module-name>io.cloudevents.spring</module-name>
<spring-boot.version>2.4.0</spring-boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-basic</artifactId>
<version>${project.version}</version>
<!-- This will be optional too if we want to support messaging -->
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<optional>true</optional>
</dependency>
<!-- Test deps -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,95 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.http;
import java.util.function.Consumer;
import java.util.function.Supplier;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventContext;
import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.http.HttpMessageFactory;
import io.cloudevents.http.impl.HttpMessageWriter;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
/**
* Miscellaneous utility methods to assist with Cloud Events in the context of Spring Web
* frameworks. Primarily intended for the internal use within Spring-based frameworks or
* integrations.
*
* @author Dave Syer
* @since 2.0
*/
public class CloudEventHttpUtils {
private CloudEventHttpUtils() {
}
/**
* Create a {@link MessageReader} to assist in conversion of an HTTP request to a
* {@link CloudEvent}.
* @param headers the HTTP request headers
* @param body the HTTP request body as a byte array
* @return a {@link MessageReader} representing the {@link CloudEvent}
*/
public static MessageReader toReader(HttpHeaders headers, Supplier<byte[]> body) {
return HttpMessageFactory.createReaderFromMultimap(headers, body.get());
}
/**
* Create an {@link HttpMessageWriter} that can hand off a {@link CloudEvent} to an
* HTTP response. Mainly useful in a blocking (not async) setting because the response
* body has to be consumed directly.
* @param headers the response headers (will be mutated)
* @param sendBody a consumer for the response body that puts the bytes on the wire
*/
public static HttpMessageWriter toWriter(HttpHeaders headers, Consumer<byte[]> sendBody) {
return HttpMessageFactory.createWriter(headers::set, sendBody);
}
/**
* Helper method for extracting {@link HttpHeaders} from a {@link CloudEvent}. Can,
* for instance, be used in a <code>&#64;RequestMapping</code> to return a
* {@link ResponseEntity} that has headers copied from a {@link CloudEvent}.
* @param event the input {@link CloudEvent}
* @return the response headers represented by the event
*/
public static HttpHeaders toHttp(CloudEventContext event) {
HttpHeaders headers = new HttpHeaders();
CloudEventUtils.toReader(CloudEventBuilder.fromContext(event).build()).read(toWriter(headers, bytes -> {
}));
return headers;
}
/**
* Helper method for converting {@link HttpHeaders} to a {@link CloudEvent}. The input
* headers must represent a valid event in "binary" form, i.e. it must have headers
* "ce-id", "ce-specversion" etc.
* @param headers the input request headers
* @return a {@link CloudEventBuilder} that can be used to create a new
* {@link CloudEvent}
*
*/
public static CloudEventBuilder fromHttp(HttpHeaders headers) {
return CloudEventBuilder
.fromContext(CloudEventUtils.toEvent(CloudEventHttpUtils.toReader(headers, () -> null)));
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.spring.http;
import java.util.Map;
import io.cloudevents.core.message.impl.MessageUtils;
import org.springframework.http.HttpHeaders;
public class CloudEventsHeaders {
public static final String CE_PREFIX = "ce-";
public static final Map<String, String> ATTRIBUTES_TO_HEADERS = MessageUtils
.generateAttributesToHeadersMapping(v -> {
if (v.equals("datacontenttype")) {
return HttpHeaders.CONTENT_TYPE;
}
return CE_PREFIX + v;
});
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
}

View File

@ -0,0 +1,5 @@
/**
* Provides classes related to working with Cloud Events within the context of Spring and
* HTTP.
*/
package io.cloudevents.spring.http;

View File

@ -0,0 +1,75 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.mvc;
import java.io.IOException;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.AbstractHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;
import org.springframework.util.StreamUtils;
/**
* An {@link HttpMessageConverter} for {@link CloudEvent CloudEvents}. Supports the use of
* {@link CloudEvent} in a <code>&#64;RequestMapping</code> as either a method parameter
* or a return value.
*
* @author Dave Syer
*
*/
public class CloudEventHttpMessageConverter extends AbstractHttpMessageConverter<CloudEvent> {
public CloudEventHttpMessageConverter() {
super(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL);
}
@Override
protected boolean supports(Class<?> clazz) {
return CloudEvent.class.isAssignableFrom(clazz);
}
@Override
protected CloudEvent readInternal(Class<? extends CloudEvent> clazz, HttpInputMessage inputMessage)
throws IOException, HttpMessageNotReadableException {
byte[] body = StreamUtils.copyToByteArray(inputMessage.getBody());
return CloudEventHttpUtils.toReader(inputMessage.getHeaders(), () -> body).toEvent();
}
@Override
protected void writeInternal(CloudEvent event, HttpOutputMessage outputMessage)
throws IOException, HttpMessageNotWritableException {
CloudEventUtils.toReader(event)
.read(CloudEventHttpUtils.toWriter(outputMessage.getHeaders(), body -> copy(body, outputMessage)));
}
private void copy(byte[] body, HttpOutputMessage outputMessage) {
try {
StreamUtils.copy(body, outputMessage.getBody());
}
catch (IOException e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -0,0 +1,4 @@
/**
* Provides classes related to working with Cloud Events within the context of Spring MVC.
*/
package io.cloudevents.spring.mvc;

View File

@ -0,0 +1,77 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.webflux;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.StreamUtils;
/**
* A reactive {@link HttpMessageReader} for {@link CloudEvent CloudEvents}, converting
* from an HTTP request to a cloud event. Supports the use of {@link CloudEvent} as an
* input to a reactive endpoint.
*
* @author Dave Syer
*
*/
public class CloudEventHttpMessageReader implements HttpMessageReader<CloudEvent> {
@Override
public List<MediaType> getReadableMediaTypes() {
return Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL);
}
@Override
public boolean canRead(ResolvableType elementType, MediaType mediaType) {
return CloudEvent.class.isAssignableFrom(elementType.toClass());
}
@Override
public Flux<CloudEvent> read(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {
return Flux.from(readMono(elementType, message, hints));
}
@Override
public Mono<CloudEvent> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {
HttpHeaders headers = message.getHeaders();
Mono<byte[]> body = DataBufferUtils.join(message.getBody()).map(buffer -> {
try {
return StreamUtils.copyToByteArray(buffer.asInputStream(true));
}
catch (Exception e) {
throw new IllegalArgumentException(e);
}
});
return body.map(bytes -> CloudEventHttpUtils.toReader(headers, () -> bytes)).map(MessageReader::toEvent);
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.webflux;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import io.cloudevents.spring.http.CloudEventsHeaders;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageWriter;
/**
* A reactive {@link HttpMessageWriter} for {@link CloudEvent CloudEvents}, converting
* from a cloud event to an HTTP response. Supports the use of {@link CloudEvent} as an
* output from a reactive endpoint.
*
* @author Dave Syer
*
*/
public class CloudEventHttpMessageWriter implements HttpMessageWriter<CloudEvent> {
@Override
public List<MediaType> getWritableMediaTypes() {
return Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL);
}
@Override
public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
return CloudEvent.class.isAssignableFrom(elementType.toClass());
}
@Override
public Mono<Void> write(Publisher<? extends CloudEvent> inputStream, ResolvableType elementType,
MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
return Mono.from(inputStream).map(CloudEventUtils::toReader)
.flatMap(reader -> reader.read(new ReactiveHttpMessageWriter(message)));
}
private static class ReactiveHttpMessageWriter
implements MessageWriter<CloudEventWriter<Mono<Void>>, Mono<Void>>, CloudEventWriter<Mono<Void>> {
private final ReactiveHttpOutputMessage response;
public ReactiveHttpMessageWriter(ReactiveHttpOutputMessage response) {
this.response = response;
}
// Binary visitor factory
@Override
public CloudEventWriter<Mono<Void>> create(SpecVersion version) {
this.response.getHeaders().set(CloudEventsHeaders.SPEC_VERSION, version.toString());
return this;
}
// Binary visitor
@Override
public ReactiveHttpMessageWriter withAttribute(String name, String value) throws CloudEventRWException {
this.response.getHeaders().set(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
return this;
}
@Override
public ReactiveHttpMessageWriter withExtension(String name, String value) throws CloudEventRWException {
this.response.getHeaders().set("ce-" + name, value);
return this;
}
@Override
public Mono<Void> end(CloudEventData value) throws CloudEventRWException {
return copy(value.toBytes(), this.response);
}
@Override
public Mono<Void> end() {
return copy(new byte[0], this.response);
}
// Structured visitor
@Override
public Mono<Void> setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
this.response.getHeaders().set(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
return copy(value, this.response);
}
private Mono<Void> copy(byte[] bytes, ReactiveHttpOutputMessage message) {
DataBuffer data = message.bufferFactory().wrap(bytes);
message.getHeaders().setContentLength(bytes.length);
return message.writeWith(Mono.just(data));
}
}
}

View File

@ -0,0 +1,5 @@
/**
* Provides classes related to working with Cloud Events within the context of Spring
* WebFlux.
*/
package io.cloudevents.spring.webflux;

View File

@ -0,0 +1,207 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.mvc;
import java.net.URI;
import java.util.List;
import java.util.UUID;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
class MvcRestControllerTests {
@Autowired
private TestRestTemplate rest;
@LocalServerPort
private int port;
@Test
void echoWithCorrectHeaders() {
ResponseEntity<String> response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) //
.header("ce-id", "12345") //
.header("ce-specversion", "1.0") //
.header("ce-type", "io.spring.event") //
.header("ce-source", "https://spring.io/events") //
.contentType(MediaType.APPLICATION_JSON) //
.body("{\"value\":\"Dave\"}"), String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
HttpHeaders headers = response.getHeaders();
assertThat(headers).containsKey("ce-id");
assertThat(headers).containsKey("ce-source");
assertThat(headers).containsKey("ce-type");
// assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
}
@Test
void structuredRequestResponseEvents() {
ResponseEntity<String> response = rest
.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
.contentType(new MediaType("application", "cloudevents+json")) //
.body("{" //
+ "\"id\":\"12345\"," //
+ "\"specversion\":\"1.0\"," //
+ "\"type\":\"io.spring.event\"," //
+ "\"source\":\"https://spring.io/events\"," //
+ "\"data\":{\"value\":\"Dave\"}}"),
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
HttpHeaders headers = response.getHeaders();
assertThat(headers).containsKey("ce-id");
assertThat(headers).containsKey("ce-source");
assertThat(headers).containsKey("ce-type");
// assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
}
@Test
void requestResponseEvents() {
ResponseEntity<String> response = rest
.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
.header("ce-id", "12345") //
.header("ce-specversion", "1.0") //
.header("ce-type", "io.spring.event") //
.header("ce-source", "https://spring.io/events") //
.contentType(MediaType.APPLICATION_JSON) //
.body("{\"value\":\"Dave\"}"), String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
HttpHeaders headers = response.getHeaders();
assertThat(headers).containsKey("ce-id");
assertThat(headers).containsKey("ce-source");
assertThat(headers).containsKey("ce-type");
assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
}
@SpringBootApplication
@RestController
static class TestApplication {
@PostMapping("/")
public ResponseEntity<Foo> echo(@RequestBody Foo foo, @RequestHeader HttpHeaders headers) {
CloudEvent attributes = CloudEventHttpUtils.fromHttp(headers) //
.withId(UUID.randomUUID().toString()) //
.withSource(URI.create("https://spring.io/foos")) //
.withType("io.spring.event.Foo") //
.build();
HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes);
return ResponseEntity.ok().headers(outgoing).body(foo);
}
@PostMapping("/event")
public CloudEvent ce(@RequestBody CloudEvent event) {
CloudEvent attributes = CloudEventBuilder.from(event) //
.withId(UUID.randomUUID().toString()) //
.withSource(URI.create("https://spring.io/foos")) //
.withType("io.spring.event.Foo") //
.withData(event.getData().toBytes()) //
.build();
return attributes;
}
@Configuration
public static class CloudEventHandlerConfiguration implements WebMvcConfigurer {
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
converters.add(0, new CloudEventHttpMessageConverter());
}
}
}
}
class Foo {
private String value;
public Foo() {
}
public Foo(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Foo [value=" + this.value + "]";
}
}

View File

@ -0,0 +1,201 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.webflux;
import java.net.URI;
import java.util.UUID;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.codec.CodecCustomizer;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.CodecConfigurer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=REACTIVE")
class WebFluxRestControllerTests {
@Autowired
private TestRestTemplate rest;
@LocalServerPort
private int port;
@Test
void echoWithCorrectHeaders() {
ResponseEntity<String> response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) //
.header("ce-id", "12345") //
.header("ce-specversion", "1.0") //
.header("ce-type", "io.spring.event") //
.header("ce-source", "https://spring.io/events") //
.contentType(MediaType.APPLICATION_JSON) //
.body("{\"value\":\"Dave\"}"), String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
HttpHeaders headers = response.getHeaders();
assertThat(headers).containsKey("ce-id");
assertThat(headers).containsKey("ce-source");
assertThat(headers).containsKey("ce-type");
// assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
}
@Test
void structuredRequestResponseEvents() {
ResponseEntity<String> response = rest
.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
.contentType(new MediaType("application", "cloudevents+json")) //
.body("{" //
+ "\"id\":\"12345\"," //
+ "\"specversion\":\"1.0\"," //
+ "\"type\":\"io.spring.event\"," //
+ "\"source\":\"https://spring.io/events\"," //
+ "\"data\":{\"value\":\"Dave\"}}"),
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
HttpHeaders headers = response.getHeaders();
assertThat(headers).containsKey("ce-id");
assertThat(headers).containsKey("ce-source");
assertThat(headers).containsKey("ce-type");
// assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
}
@Test
void requestResponseEvents() {
ResponseEntity<String> response = rest
.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/event")) //
.header("ce-id", "12345") //
.header("ce-specversion", "1.0") //
.header("ce-type", "io.spring.event") //
.header("ce-source", "https://spring.io/events") //
.contentType(MediaType.APPLICATION_JSON) //
.body("{\"value\":\"Dave\"}"), String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}");
HttpHeaders headers = response.getHeaders();
assertThat(headers).containsKey("ce-id");
assertThat(headers).containsKey("ce-source");
assertThat(headers).containsKey("ce-type");
// assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345");
assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo");
assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos");
}
@SpringBootApplication
@RestController
static class TestApplication {
@PostMapping("/")
public ResponseEntity<Foo> echo(@RequestBody Foo foo, @RequestHeader HttpHeaders headers) {
CloudEvent attributes = CloudEventHttpUtils.fromHttp(headers).withId(UUID.randomUUID().toString())
.withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo").build();
HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes);
return ResponseEntity.ok().headers(outgoing).body(foo);
}
@PostMapping("/event")
public Mono<CloudEvent> event(@RequestBody Mono<CloudEvent> body) {
return body.map(event -> CloudEventBuilder.from(event).withId(UUID.randomUUID().toString())
.withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo")
.withData(event.getData().toBytes()).build());
}
@Configuration
public static class CloudEventHandlerConfiguration implements CodecCustomizer {
@Override
public void customize(CodecConfigurer configurer) {
configurer.customCodecs().register(new CloudEventHttpMessageReader());
configurer.customCodecs().register(new CloudEventHttpMessageWriter());
}
}
}
}
class Foo {
private String value;
public Foo() {
}
public Foo(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Foo [value=" + this.value + "]";
}
}