Updated to latest sdk-java 2.0.0-milestone2 (#2788)

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2020-09-08 15:52:32 +02:00 committed by GitHub
parent a46422142e
commit 84d95a7aef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 39 deletions

View File

@ -11,18 +11,19 @@
<properties>
<java.version>1.8</java.version>
<vertx.verticle>org.knative.examples.cloudevents.vertx.CloudEventSampleVerticle</vertx.verticle>
<cloudevents.version>2.0.0-milestone2</cloudevents.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>2.0.0-milestone1</version>
<version>${cloudevents.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-vertx</artifactId>
<version>2.0.0-milestone1</version>
<version>${cloudevents.version}</version>
</dependency>
</dependencies>

View File

@ -8,7 +8,12 @@ import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import java.net.URI;
import java.util.Optional;
@ -29,7 +34,7 @@ public class CloudEventSampleVerticle extends AbstractVerticle {
// Get the sink uri, if any
Optional<URI> env = Optional.ofNullable(System.getenv("K_SINK")).map(URI::create);
if (env.isPresent()) {
server.requestHandler(generateSinkHandler(vertx.createHttpClient(), env.get()));
server.requestHandler(generateSinkHandler(WebClient.create(vertx), env.get()));
} else {
// If K_SINK is not set, just echo back the events
server.requestHandler(generateEchoHandler());
@ -94,47 +99,14 @@ public class CloudEventSampleVerticle extends AbstractVerticle {
/**
* Generates an handler that sink the does the echo of the received event
*/
public static Handler<HttpServerRequest> generateSinkHandler(HttpClient client, URI sink) {
public static Handler<HttpServerRequest> generateSinkHandler(WebClient client, URI sink) {
return serverRequest -> {
// Transform the HttpRequest to Event
VertxMessageFactory
.createReader(serverRequest)
.map(MessageReader::toEvent)
.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
CloudEvent event = asyncResult.result();
System.out.println("Received event: " + event);
// Let's modify the event changing the source
CloudEvent outputEvent = CloudEventBuilder
.v1(event)
.withSource(URI.create("https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx"))
.build();
// Prepare the http request to the sink
HttpClientRequest sinkRequest = client.postAbs(sink.toString());
// Define how to handle the response from the sink
sinkRequest.handler(sinkResponse -> {
if (sinkResponse.statusCode() >= 200 && sinkResponse.statusCode() < 300) {
serverRequest
.response()
.setStatusCode(202)
.end();
} else {
System.out.println("Error received from sink: " + sinkResponse.statusCode() + " " + sinkResponse.statusMessage());
serverRequest
.response()
.setStatusCode(500)
.end();
}
});
// Send the event to K_SINK
VertxMessageFactory
.createWriter(sinkRequest)
.writeBinary(event);
} else {
if (asyncResult.failed()) {
System.out.println("Error while decoding the event: " + asyncResult.cause());
// Reply with a failure
@ -142,7 +114,48 @@ public class CloudEventSampleVerticle extends AbstractVerticle {
.response()
.setStatusCode(400)
.end();
return;
}
CloudEvent event = asyncResult.result();
System.out.println("Received event: " + event);
// Let's modify the event changing the source
CloudEvent outputEvent = CloudEventBuilder
.v1(event)
.withSource(URI.create("https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx"))
.build();
// Send the request to the sink and check the response
VertxMessageFactory
.createWriter(client.postAbs(sink.toString()))
.writeBinary(outputEvent)
.onComplete(ar -> {
if (ar.failed()) {
System.out.println("Something bad happened while sending event to the sink: " + ar.cause());
serverRequest
.response()
.setStatusCode(500)
.end();
return;
}
HttpResponse<Buffer> response = ar.result();
if (response.statusCode() >= 200 && response.statusCode() < 300) {
serverRequest
.response()
.setStatusCode(202)
.end();
} else if (ar.succeeded()) {
System.out.println("Error received from sink: " + response.statusCode() + " " + response.statusMessage());
serverRequest
.response()
.setStatusCode(500)
.end();
}
});
});
};
}