Added a cloudevents sample using Vert.x (#2480)

* Added a cloudevents sample using vertx

* Suggestions

* index

* image name fix

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Support K_SINK

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* New APIs

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Pretty md

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Using 2.0.0-milestone1

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2020-06-30 06:40:29 +02:00 committed by GitHub
parent 634df4693a
commit 100918f280
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 359 additions and 0 deletions

View File

@ -0,0 +1,6 @@
# Target dir
**/target/
# Idea files
**/*.iml
**/.idea/

View File

@ -0,0 +1,123 @@
# Vert.x + CloudEvents + Knative
A simple web app written in Java using Vert.x that can receive CloudEvents. It
supports running in two modes:
1. The default mode has the app reply to your input events with the output
event, which is simplest for demonstrating things working in isolation, but
is also the model for working for the Knative Eventing `Broker` concept. The
input event is modified assigning a new source and type attribute.
2. `K_SINK` mode has the app send events to the destination encoded in
`$K_SINK`, which is useful to demonstrate how folks can synthesize events to
send to a Service or Broker when not initiated by a Broker invocation (e.g.
implementing an event source). The input event is modified assigning a new
source and type attribute.
The application will use `$K_SINK`-mode whenever the environment variable is
specified.
Follow the steps below to create the sample code and then deploy the app to your
cluster. You can also download a working copy of the sample, by running the
following commands:
```shell
git clone -b "{{< branch >}}" https://github.com/knative/docs knative-docs
cd knative-docs/docs/serving/samples/cloudevents/cloudevents-vertx
```
## Before you begin
- A Kubernetes cluster with Knative installed and DNS configured. Follow the
[installation instructions](../../../../install/README.md) if you need to
create one.
- [Docker](https://www.docker.com) installed and running on your local machine,
and a Docker Hub account configured (we'll use it for a container registry).
## Build and deploy the sample
To build the image, run:
```shell
mvn compile jib:build -Dimage=<image_name>
```
To deploy the Knative Service, look in the `service.yaml` and replace `<image>`
with the deployed image name. Then run:
```shell
kubectl apply -f service.yaml
```
## Testing the sample
Get the URL for your Service with:
```shell
$ kubectl get ksvc
NAME URL LATESTCREATED LATESTREADY READY REASON
cloudevents-vertx http://cloudevents-java.xip.io cloudevents-vertx-86h28 cloudevents-vertx-86h28 True
```
Then send a CloudEvent to it with:
```shell
$ curl \
-X POST -v \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: http://curl-command" \
-H "ce-type: curl.demo" \
-H "ce-id: 123-abc" \
-d '{"name":"Dave"}' \
http://cloudevents-java.xip.io
```
You can also send CloudEvents spawning a temporary curl pod in your cluster
with:
```shell
$ kubectl run curl \
--image=curlimages/curl --rm=true --restart=Never -ti -- \
-X POST -v \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: http://curl-command" \
-H "ce-type: curl.demo" \
-H "ce-id: 123-abc" \
-d '{"name":"Dave"}' \
http://cloudevents-java.default.svc
```
You'll see on the console:
```shell
> POST / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.69.1
> Accept: */*
> content-type: application/json
> ce-specversion: 1.0
> ce-source: http://curl-command
> ce-type: curl.demo
> ce-id: 123-abc
> Content-Length: 15
>
< HTTP/1.1 202 Accepted
< ce-specversion: 1.0
< ce-id: 123-abc
< ce-source: https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx
< ce-type: curl.demo
< content-type: application/json
< content-length: 15
<
{"name":"Dave"}
```
## Removing the sample app deployment
To remove the sample app from your cluster, delete the service record:
```shell
kubectl delete --filename service.yaml
```

View File

@ -0,0 +1,8 @@
---
title: "Cloud Events - Java and Vert.x"
linkTitle: "Java and Vert.x"
weight: 1
type: "docs"
---
{{% readfile file="README.md" %}}

View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.knative.examples</groupId>
<artifactId>cloudevents-example-vertx</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<vertx.verticle>org.knative.examples.cloudevents.vertx.CloudEventSampleVerticle</vertx.verticle>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>2.0.0-milestone1</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-vertx</artifactId>
<version>2.0.0-milestone1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>2.0.0</version>
<configuration>
<container>
<jvmFlags>
<jvmFlag>-Dorg.slf4j.simpleLogger.defaultLogLevel=warn</jvmFlag>
</jvmFlags>
<mainClass>org.knative.examples.cloudevents.vertx.CloudEventSampleVerticle</mainClass>
</container>
</configuration>
</plugin>
<!-- With this plugin you can locally test the application using `mvn vertx:run` -->
<plugin>
<groupId>io.reactiverse</groupId>
<artifactId>vertx-maven-plugin</artifactId>
<version>1.0.22</version>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,10 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: cloudevents-vertx
namespace: default
spec:
template:
spec:
containers:
- image: <image>

View File

@ -0,0 +1,151 @@
package org.knative.examples.cloudevents.vertx;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.*;
import java.net.URI;
import java.util.Optional;
public class CloudEventSampleVerticle extends AbstractVerticle {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new CloudEventSampleVerticle());
}
public void start(Promise<Void> startPromise) {
HttpServer server = vertx.createHttpServer();
// Get the port
int port = Optional.ofNullable(System.getenv("PORT")).map(Integer::parseInt).orElse(8080);
// Get the sink uri, if any
Optional<URI> env = Optional.ofNullable(System.getenv("K_SINK")).map(URI::create);
if (env.isPresent()) {
server.requestHandler(generateSinkHandler(vertx.createHttpClient(), env.get()));
} else {
// If K_SINK is not set, just echo back the events
server.requestHandler(generateEchoHandler());
}
server
// Listen and complete verticle deploy
.listen(port, serverResult -> {
if (serverResult.succeeded()) {
System.out.println("Server started on port " + serverResult.result().actualPort());
startPromise.complete();
} else {
System.out.println("Error starting the server");
serverResult.cause().printStackTrace();
startPromise.fail(serverResult.cause());
}
});
}
/**
* Generates an handler that does the echo of the received event
*/
public static Handler<HttpServerRequest> generateEchoHandler() {
return request -> {
// Transform the HttpRequest to Event
VertxMessageFactory
.createReader(request)
.map(MessageReader::toEvent)
.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
CloudEvent event = asyncResult.result();
System.out.println("Received event: " + event);
// Let's modify the event changing the source
CloudEvent outputEvent = CloudEventBuilder
.v1(event)
.withSource(URI.create("https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx"))
.build();
// Set response status code
HttpServerResponse response = request
.response()
.setStatusCode(202);
// Reply with the event in binary mode
VertxMessageFactory
.createWriter(response)
.writeBinary(outputEvent);
} else {
System.out.println("Error while decoding the event: " + asyncResult.cause());
// Reply with a failure
request
.response()
.setStatusCode(400)
.end();
}
});
};
}
/**
* Generates an handler that sink the does the echo of the received event
*/
public static Handler<HttpServerRequest> generateSinkHandler(HttpClient client, URI sink) {
return serverRequest -> {
// Transform the HttpRequest to Event
VertxMessageFactory
.createReader(serverRequest)
.map(MessageReader::toEvent)
.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
CloudEvent event = asyncResult.result();
System.out.println("Received event: " + event);
// Let's modify the event changing the source
CloudEvent outputEvent = CloudEventBuilder
.v1(event)
.withSource(URI.create("https://github.com/knative/docs/docs/serving/samples/cloudevents/cloudevents-vertx"))
.build();
// Prepare the http request to the sink
HttpClientRequest sinkRequest = client.postAbs(sink.toString());
// Define how to handle the response from the sink
sinkRequest.handler(sinkResponse -> {
if (sinkResponse.statusCode() >= 200 && sinkResponse.statusCode() < 300) {
serverRequest
.response()
.setStatusCode(202)
.end();
} else {
System.out.println("Error received from sink: " + sinkResponse.statusCode() + " " + sinkResponse.statusMessage());
serverRequest
.response()
.setStatusCode(500)
.end();
}
});
// Send the event to K_SINK
VertxMessageFactory
.createWriter(sinkRequest)
.writeBinary(event);
} else {
System.out.println("Error while decoding the event: " + asyncResult.cause());
// Reply with a failure
serverRequest
.response()
.setStatusCode(400)
.end();
}
});
};
}
}