From 84d95a7aef46c1644b7b3f8be8b7db3b71d46fdc Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Tue, 8 Sep 2020 15:52:32 +0200 Subject: [PATCH] Updated to latest sdk-java 2.0.0-milestone2 (#2788) Signed-off-by: Francesco Guardiani --- .../cloudevents/cloudevents-vertx/pom.xml | 5 +- .../vertx/CloudEventSampleVerticle.java | 87 +++++++++++-------- 2 files changed, 53 insertions(+), 39 deletions(-) diff --git a/docs/serving/samples/cloudevents/cloudevents-vertx/pom.xml b/docs/serving/samples/cloudevents/cloudevents-vertx/pom.xml index cfe1f63ba..0dd3451e1 100644 --- a/docs/serving/samples/cloudevents/cloudevents-vertx/pom.xml +++ b/docs/serving/samples/cloudevents/cloudevents-vertx/pom.xml @@ -11,18 +11,19 @@ 1.8 org.knative.examples.cloudevents.vertx.CloudEventSampleVerticle + 2.0.0-milestone2 io.cloudevents cloudevents-core - 2.0.0-milestone1 + ${cloudevents.version} io.cloudevents cloudevents-http-vertx - 2.0.0-milestone1 + ${cloudevents.version} diff --git a/docs/serving/samples/cloudevents/cloudevents-vertx/src/main/java/org/knative/examples/cloudevents/vertx/CloudEventSampleVerticle.java b/docs/serving/samples/cloudevents/cloudevents-vertx/src/main/java/org/knative/examples/cloudevents/vertx/CloudEventSampleVerticle.java index 4c3ddb6ec..06f6ce3d8 100644 --- a/docs/serving/samples/cloudevents/cloudevents-vertx/src/main/java/org/knative/examples/cloudevents/vertx/CloudEventSampleVerticle.java +++ b/docs/serving/samples/cloudevents/cloudevents-vertx/src/main/java/org/knative/examples/cloudevents/vertx/CloudEventSampleVerticle.java @@ -8,7 +8,12 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.http.*; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; import java.net.URI; import java.util.Optional; @@ -29,7 +34,7 @@ public class CloudEventSampleVerticle extends AbstractVerticle { // Get the sink uri, if any Optional env = Optional.ofNullable(System.getenv("K_SINK")).map(URI::create); if (env.isPresent()) { - server.requestHandler(generateSinkHandler(vertx.createHttpClient(), env.get())); + server.requestHandler(generateSinkHandler(WebClient.create(vertx), env.get())); } else { // If K_SINK is not set, just echo back the events server.requestHandler(generateEchoHandler()); @@ -94,47 +99,14 @@ public class CloudEventSampleVerticle extends AbstractVerticle { /** * Generates an handler that sink the does the echo of the received event */ - public static Handler generateSinkHandler(HttpClient client, URI sink) { + public static Handler generateSinkHandler(WebClient client, URI sink) { return serverRequest -> { // Transform the HttpRequest to Event VertxMessageFactory .createReader(serverRequest) .map(MessageReader::toEvent) .onComplete(asyncResult -> { - if (asyncResult.succeeded()) { - CloudEvent event = asyncResult.result(); - System.out.println("Received event: " + event); - - // Let's modify the event changing the source - CloudEvent outputEvent = CloudEventBuilder - .v1(event) - .withSource(URI.create("https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx")) - .build(); - - // Prepare the http request to the sink - HttpClientRequest sinkRequest = client.postAbs(sink.toString()); - - // Define how to handle the response from the sink - sinkRequest.handler(sinkResponse -> { - if (sinkResponse.statusCode() >= 200 && sinkResponse.statusCode() < 300) { - serverRequest - .response() - .setStatusCode(202) - .end(); - } else { - System.out.println("Error received from sink: " + sinkResponse.statusCode() + " " + sinkResponse.statusMessage()); - serverRequest - .response() - .setStatusCode(500) - .end(); - } - }); - - // Send the event to K_SINK - VertxMessageFactory - .createWriter(sinkRequest) - .writeBinary(event); - } else { + if (asyncResult.failed()) { System.out.println("Error while decoding the event: " + asyncResult.cause()); // Reply with a failure @@ -142,7 +114,48 @@ public class CloudEventSampleVerticle extends AbstractVerticle { .response() .setStatusCode(400) .end(); + return; } + + CloudEvent event = asyncResult.result(); + System.out.println("Received event: " + event); + + // Let's modify the event changing the source + CloudEvent outputEvent = CloudEventBuilder + .v1(event) + .withSource(URI.create("https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx")) + .build(); + + // Send the request to the sink and check the response + VertxMessageFactory + .createWriter(client.postAbs(sink.toString())) + .writeBinary(outputEvent) + .onComplete(ar -> { + if (ar.failed()) { + System.out.println("Something bad happened while sending event to the sink: " + ar.cause()); + serverRequest + .response() + .setStatusCode(500) + .end(); + return; + } + + HttpResponse response = ar.result(); + + + if (response.statusCode() >= 200 && response.statusCode() < 300) { + serverRequest + .response() + .setStatusCode(202) + .end(); + } else if (ar.succeeded()) { + System.out.println("Error received from sink: " + response.statusCode() + " " + response.statusMessage()); + serverRequest + .response() + .setStatusCode(500) + .end(); + } + }); }); }; }