From bbce65aa64e679013816e0cc6812224c0aaf60ac Mon Sep 17 00:00:00 2001 From: Alfusainey Jallow Date: Tue, 24 Nov 2020 09:38:37 +0100 Subject: [PATCH] Add AMQP example (#294) * Add AMQP example Signed-off-by: Alfusainey Jallow * close client connection after sending msg Signed-off-by: Alfusainey Jallow * use Vert.x 4 and refactor according to feedback Signed-off-by: Alfusainey Jallow * resolve conflicts + move to examples/amqp-proton Signed-off-by: Alfusainey Jallow --- examples/amqp-proton/README.md | 32 ++++ examples/amqp-proton/pom.xml | 34 ++++ .../examples/amqp/vertx/AmqpClient.java | 162 ++++++++++++++++++ .../examples/amqp/vertx/AmqpServer.java | 109 ++++++++++++ examples/pom.xml | 2 +- 5 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 examples/amqp-proton/README.md create mode 100644 examples/amqp-proton/pom.xml create mode 100644 examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java create mode 100644 examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java diff --git a/examples/amqp-proton/README.md b/examples/amqp-proton/README.md new file mode 100644 index 00000000..b25555cd --- /dev/null +++ b/examples/amqp-proton/README.md @@ -0,0 +1,32 @@ +# AMQP 1.0 + CloudEvents sample +This example uses the vertx-proton library and the AMQP 1.0 protocol binding for cloud events to implement a client and server that communicates over AMQP to exchange cloud event messages. + +The vertx-proton library makes it easy to create (reactive) AMQP clients and servers and is a wrapper around Qpid proton--a library used to implement the AMQP 1.0 protocol binding for cloud events. + +## Build + +```shell +mvn package +``` + +## Start AMQP Server +Starts the AMQP server on `127.0.0.1` to listen for incoming connections on the default (insecure) AMQP port `5672`. Once the server is started, it can either receive or send messages to a connected client. The opening and closing of a connection is initiated by a client. + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpServer" +``` + +## Start AMQP Client +The client simply opens a connection with the server and either sends or receives a message containing a cloud event. Once the client completes sending or receiving a message, it initiates the closing of the connection by emitting the `AMQP CLOSE frame`. + +Send a message to the server. + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="send" +``` + +Receive a message from the server. + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="receive" +``` diff --git a/examples/amqp-proton/pom.xml b/examples/amqp-proton/pom.xml new file mode 100644 index 00000000..d0e3ff8c --- /dev/null +++ b/examples/amqp-proton/pom.xml @@ -0,0 +1,34 @@ + + + + cloudevents-examples + io.cloudevents + 2.0.0-SNAPSHOT + + 4.0.0 + + cloudevents-amqp-proton-example + + + 4.0.0.Beta1 + + + + io.cloudevents + cloudevents-amqp-proton + ${project.version} + + + io.vertx + vertx-core + ${vertx.version} + + + io.vertx + vertx-proton + ${vertx.version} + + + + + diff --git a/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java new file mode 100644 index 00000000..14710892 --- /dev/null +++ b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java @@ -0,0 +1,162 @@ +package io.cloudevents.examples.amqp.vertx; + +import java.io.PrintWriter; + +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.message.Message; + +import io.cloudevents.CloudEvent; +import io.cloudevents.amqp.ProtonAmqpMessageFactory; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.v1.CloudEventBuilder; +import io.cloudevents.core.v1.CloudEventV1; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.proton.ProtonClient; +import io.vertx.proton.ProtonClientOptions; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonMessageHandler; +import io.vertx.proton.ProtonQoS; +import io.vertx.proton.ProtonReceiver; +import io.vertx.proton.ProtonSender; + +/** + * A example vertx-based AMQP client that interacts with a remote AMQP server to send and receive CloudEvent messages. + */ +public class AmqpClient { + + private static ProtonConnection connection; + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 5672; + private static final String SEND_MESSAGE = "send"; + private static final String RECEIVE_MESSAGE = "receive"; + + final static Vertx VERTX = Vertx.vertx(); + private static PrintWriter writer = new PrintWriter(System.out, true); + + public static void main(String args[]) { + + if (args.length < 1) { + writer.println("Usage: AmqpClient [send|receive]"); + return; + } + + final String action = args[0].toLowerCase(); + + switch (action) { + + case SEND_MESSAGE: + sendMessage(); + break; + + case RECEIVE_MESSAGE: + receiveMessage(); + break; + + default: + writer.println("Unknown action"); + } + } + + private static void sendMessage() { + connectToServer(SERVER_HOST, SERVER_PORT) + .compose(conn -> { + connection = conn; + writer.printf("[Client] Connected to %s:%s", SERVER_HOST, SERVER_PORT); + + return openSenderLink(); + }).onSuccess(sender -> { + + final JsonObject payload = new JsonObject().put("temp", 50); + + final CloudEvent event = new CloudEventBuilder() + .withAttribute(CloudEventV1.ID, "client-id") + .withAttribute(CloudEventV1.SOURCE, "http://127.0.0.1/amqp-client") + .withAttribute(CloudEventV1.TYPE, "com.example.sampletype1") + .withAttribute(CloudEventV1.TIME, "2020-11-06T21:47:12.037467+00:00") + .withData(payload.toString().getBytes()) + .build(); + + final Message message = ProtonAmqpMessageFactory.createWriter().writeBinary(event); + message.setAddress("/telemetry"); + sender.send(message, delivery -> { + if (Accepted.class.isInstance(delivery.getRemoteState())) { + writer.println("[Client:] message delivered and accepted by remote peer"); + } + connection.close(); + }); + }).onFailure(t -> { + writer.printf("[Client] Connection failed (%s)", t.getCause().getMessage()); + }); + + } + + private static void receiveMessage() { + connectToServer(SERVER_HOST, SERVER_PORT) + .compose(conn -> { + connection = conn; + writer.println("[Client] Connected"); + return Future.succeededFuture(); + }).onSuccess(success -> + openReceiverLink((delivery, message) -> { + final MessageReader reader = ProtonAmqpMessageFactory.createReader(message); + final CloudEvent event = reader.toEvent(); + writer.printf("[Client] received CloudEvent[Id=%s, Source=%s]", event.getId(), + event.getSource().toString()); + connection.close(); + }) + ).onFailure(t -> { + writer.println("[Client] Connection failed"); + }); + } + + private static Future connectToServer(final String host, final int port) { + + final Promise connectAttempt = Promise.promise(); + final ProtonClientOptions options = new ProtonClientOptions(); + final ProtonClient client = ProtonClient.create(VERTX); + + client.connect(options, host, port, connectAttempt); + + return connectAttempt.future() + .compose(unopenedConnection -> { + final Promise con = Promise.promise(); + unopenedConnection.openHandler(con); + unopenedConnection.open(); + return con.future(); + }); + } + + private static Future openSenderLink() { + if (connection == null || connection.isDisconnected()) { + throw new IllegalStateException("[Client] connection not established"); + } + + final Promise result = Promise.promise(); + final ProtonSender sender = connection.createSender(null); + sender.openHandler(result); + sender.open(); + return result.future(); + } + + private static Future openReceiverLink(final ProtonMessageHandler msgHandler) { + if (connection == null || connection.isDisconnected()) { + throw new IllegalStateException("[Client] connection not established"); + } + + final Promise result = Promise.promise(); + final ProtonReceiver receiver = connection.createReceiver(null); + receiver.setQoS(ProtonQoS.AT_LEAST_ONCE); + receiver.handler(msgHandler); + receiver.openHandler(result); + receiver.open(); + return result.future().map(recver -> { + // Ready to receive messages + return recver; + }); + } + +} diff --git a/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java new file mode 100644 index 00000000..0eb23070 --- /dev/null +++ b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java @@ -0,0 +1,109 @@ +package io.cloudevents.examples.amqp.vertx; + +import java.io.PrintWriter; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.qpid.proton.message.Message; + +import io.cloudevents.CloudEvent; +import io.cloudevents.amqp.ProtonAmqpMessageFactory; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonServer; +import io.vertx.proton.ProtonServerOptions; + +/** + * An example vertx-based AMQP server that receives and sends CloudEvent messages to/from a remote client. + */ +public class AmqpServer { + + private static final String DEFAULT_HOST = "127.0.0.1"; + private static final int DEFAULT_PORT = 5672; + + private static PrintWriter writer = new PrintWriter(System.out, true); + + public static void main(String argv[]) { + final Vertx vertx = Vertx.vertx(); + + final List args = new ArrayList<>(); + if (argv.length > 1) { + args.addAll(Arrays.asList(argv[0].split(":", 1))); + } + + final String host = args.isEmpty() ? DEFAULT_HOST : args.get(0); + final int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.get(1)); + + final ProtonServerOptions options = new ProtonServerOptions(); + options.setHost(host); + options.setPort(port); + + final ProtonServer server = ProtonServer.create(vertx, options); + server.connectHandler(con -> onConnectRequest(con)).listen(ar -> { + if (ar.succeeded()) { + writer.printf("[Server] started and listening on %s:%s\n", host, port); + } else { + writer.printf("[Server] failed to start (%s)\n", ar.cause()); + System.exit(1); + } + }); + } + + private static void onConnectRequest(final ProtonConnection con) { + // BEGIN frame received + con.sessionOpenHandler(remoteSession -> { + remoteSession.open(); + }); + // ATTACH frame received -> client wants to send messages to this server. + con.receiverOpenHandler(remoteReceiver -> { + remoteReceiver.handler((delivery, message) -> { + // message received -> convert to CloudEvent + final MessageReader reader = ProtonAmqpMessageFactory.createReader(message); + final CloudEvent event = reader.toEvent(); + + writer.printf("[Server] received CloudEvent[Id=%s, Source=%s]\n", event.getId(), + event.getSource().toString()); + + }).open(); + }); + // ATTACH frame received -> client wants to receive messages from this server. + con.senderOpenHandler(sender -> { + try { + MessageWriter writer = ProtonAmqpMessageFactory.createWriter(); + final CloudEvent event = CloudEventBuilder.v1() + .withId("amqp-server-id") + .withType("com.example.sampletype1") + .withSource(URI.create("http://127.0.0.1/amqp-server")) + .withTime(OffsetDateTime.now()) + .withData("{\"temp\": 5}".getBytes()) + .build(); + + final Message message = writer.writeBinary(event); + sender.send(message); + + } catch (final Exception e) { + writer.println("[Server] failed to send "); + } + sender.open(); + }); + //OPEN frame received + con.openHandler(remoteOpen -> { + if (remoteOpen.failed()) { + // connection with client failed. + writer.println(remoteOpen.cause()); + } else { + remoteOpen.result().open(); + } + }); + // CLOSE Frame received + con.closeHandler(remoteClose -> { + con.close(); + }); + } +} diff --git a/examples/pom.xml b/examples/pom.xml index 2384b5e4..ba885925 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -19,7 +19,7 @@ vertx basic-http restful-ws-spring-boot + amqp-proton -