Add AMQP example (#294)

* Add AMQP example

Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>

* close client connection after sending msg

Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>

* use Vert.x 4 and refactor according to feedback

Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>

* resolve conflicts + move to examples/amqp-proton

Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>
This commit is contained in:
Alfusainey Jallow 2020-11-24 09:38:37 +01:00 committed by GitHub
parent cc0892a440
commit bbce65aa64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 338 additions and 1 deletions

View File

@ -0,0 +1,32 @@
# AMQP 1.0 + CloudEvents sample
This example uses the vertx-proton library and the AMQP 1.0 protocol binding for cloud events to implement a client and server that communicates over AMQP to exchange cloud event messages.
The vertx-proton library makes it easy to create (reactive) AMQP clients and servers and is a wrapper around Qpid proton--a library used to implement the AMQP 1.0 protocol binding for cloud events.
## Build
```shell
mvn package
```
## Start AMQP Server
Starts the AMQP server on `127.0.0.1` to listen for incoming connections on the default (insecure) AMQP port `5672`. Once the server is started, it can either receive or send messages to a connected client. The opening and closing of a connection is initiated by a client.
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpServer"
```
## Start AMQP Client
The client simply opens a connection with the server and either sends or receives a message containing a cloud event. Once the client completes sending or receiving a message, it initiates the closing of the connection by emitting the `AMQP CLOSE frame`.
Send a message to the server.
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="send"
```
Receive a message from the server.
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="receive"
```

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-amqp-proton-example</artifactId>
<properties>
<vertx.version>4.0.0.Beta1</vertx.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-amqp-proton</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${vertx.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,162 @@
package io.cloudevents.examples.amqp.vertx;
import java.io.PrintWriter;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.message.Message;
import io.cloudevents.CloudEvent;
import io.cloudevents.amqp.ProtonAmqpMessageFactory;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.core.v1.CloudEventV1;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
/**
* A example vertx-based AMQP client that interacts with a remote AMQP server to send and receive CloudEvent messages.
*/
public class AmqpClient {
private static ProtonConnection connection;
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 5672;
private static final String SEND_MESSAGE = "send";
private static final String RECEIVE_MESSAGE = "receive";
final static Vertx VERTX = Vertx.vertx();
private static PrintWriter writer = new PrintWriter(System.out, true);
public static void main(String args[]) {
if (args.length < 1) {
writer.println("Usage: AmqpClient [send|receive]");
return;
}
final String action = args[0].toLowerCase();
switch (action) {
case SEND_MESSAGE:
sendMessage();
break;
case RECEIVE_MESSAGE:
receiveMessage();
break;
default:
writer.println("Unknown action");
}
}
private static void sendMessage() {
connectToServer(SERVER_HOST, SERVER_PORT)
.compose(conn -> {
connection = conn;
writer.printf("[Client] Connected to %s:%s", SERVER_HOST, SERVER_PORT);
return openSenderLink();
}).onSuccess(sender -> {
final JsonObject payload = new JsonObject().put("temp", 50);
final CloudEvent event = new CloudEventBuilder()
.withAttribute(CloudEventV1.ID, "client-id")
.withAttribute(CloudEventV1.SOURCE, "http://127.0.0.1/amqp-client")
.withAttribute(CloudEventV1.TYPE, "com.example.sampletype1")
.withAttribute(CloudEventV1.TIME, "2020-11-06T21:47:12.037467+00:00")
.withData(payload.toString().getBytes())
.build();
final Message message = ProtonAmqpMessageFactory.createWriter().writeBinary(event);
message.setAddress("/telemetry");
sender.send(message, delivery -> {
if (Accepted.class.isInstance(delivery.getRemoteState())) {
writer.println("[Client:] message delivered and accepted by remote peer");
}
connection.close();
});
}).onFailure(t -> {
writer.printf("[Client] Connection failed (%s)", t.getCause().getMessage());
});
}
private static void receiveMessage() {
connectToServer(SERVER_HOST, SERVER_PORT)
.compose(conn -> {
connection = conn;
writer.println("[Client] Connected");
return Future.succeededFuture();
}).onSuccess(success ->
openReceiverLink((delivery, message) -> {
final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
final CloudEvent event = reader.toEvent();
writer.printf("[Client] received CloudEvent[Id=%s, Source=%s]", event.getId(),
event.getSource().toString());
connection.close();
})
).onFailure(t -> {
writer.println("[Client] Connection failed");
});
}
private static Future<ProtonConnection> connectToServer(final String host, final int port) {
final Promise<ProtonConnection> connectAttempt = Promise.promise();
final ProtonClientOptions options = new ProtonClientOptions();
final ProtonClient client = ProtonClient.create(VERTX);
client.connect(options, host, port, connectAttempt);
return connectAttempt.future()
.compose(unopenedConnection -> {
final Promise<ProtonConnection> con = Promise.promise();
unopenedConnection.openHandler(con);
unopenedConnection.open();
return con.future();
});
}
private static Future<ProtonSender> openSenderLink() {
if (connection == null || connection.isDisconnected()) {
throw new IllegalStateException("[Client] connection not established");
}
final Promise<ProtonSender> result = Promise.promise();
final ProtonSender sender = connection.createSender(null);
sender.openHandler(result);
sender.open();
return result.future();
}
private static Future<ProtonReceiver> openReceiverLink(final ProtonMessageHandler msgHandler) {
if (connection == null || connection.isDisconnected()) {
throw new IllegalStateException("[Client] connection not established");
}
final Promise<ProtonReceiver> result = Promise.promise();
final ProtonReceiver receiver = connection.createReceiver(null);
receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
receiver.handler(msgHandler);
receiver.openHandler(result);
receiver.open();
return result.future().map(recver -> {
// Ready to receive messages
return recver;
});
}
}

View File

@ -0,0 +1,109 @@
package io.cloudevents.examples.amqp.vertx;
import java.io.PrintWriter;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.qpid.proton.message.Message;
import io.cloudevents.CloudEvent;
import io.cloudevents.amqp.ProtonAmqpMessageFactory;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
/**
* An example vertx-based AMQP server that receives and sends CloudEvent messages to/from a remote client.
*/
public class AmqpServer {
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 5672;
private static PrintWriter writer = new PrintWriter(System.out, true);
public static void main(String argv[]) {
final Vertx vertx = Vertx.vertx();
final List<String> args = new ArrayList<>();
if (argv.length > 1) {
args.addAll(Arrays.asList(argv[0].split(":", 1)));
}
final String host = args.isEmpty() ? DEFAULT_HOST : args.get(0);
final int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.get(1));
final ProtonServerOptions options = new ProtonServerOptions();
options.setHost(host);
options.setPort(port);
final ProtonServer server = ProtonServer.create(vertx, options);
server.connectHandler(con -> onConnectRequest(con)).listen(ar -> {
if (ar.succeeded()) {
writer.printf("[Server] started and listening on %s:%s\n", host, port);
} else {
writer.printf("[Server] failed to start (%s)\n", ar.cause());
System.exit(1);
}
});
}
private static void onConnectRequest(final ProtonConnection con) {
// BEGIN frame received
con.sessionOpenHandler(remoteSession -> {
remoteSession.open();
});
// ATTACH frame received -> client wants to send messages to this server.
con.receiverOpenHandler(remoteReceiver -> {
remoteReceiver.handler((delivery, message) -> {
// message received -> convert to CloudEvent
final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
final CloudEvent event = reader.toEvent();
writer.printf("[Server] received CloudEvent[Id=%s, Source=%s]\n", event.getId(),
event.getSource().toString());
}).open();
});
// ATTACH frame received -> client wants to receive messages from this server.
con.senderOpenHandler(sender -> {
try {
MessageWriter<?, Message> writer = ProtonAmqpMessageFactory.createWriter();
final CloudEvent event = CloudEventBuilder.v1()
.withId("amqp-server-id")
.withType("com.example.sampletype1")
.withSource(URI.create("http://127.0.0.1/amqp-server"))
.withTime(OffsetDateTime.now())
.withData("{\"temp\": 5}".getBytes())
.build();
final Message message = writer.writeBinary(event);
sender.send(message);
} catch (final Exception e) {
writer.println("[Server] failed to send ");
}
sender.open();
});
//OPEN frame received
con.openHandler(remoteOpen -> {
if (remoteOpen.failed()) {
// connection with client failed.
writer.println(remoteOpen.cause());
} else {
remoteOpen.result().open();
}
});
// CLOSE Frame received
con.closeHandler(remoteClose -> {
con.close();
});
}
}

View File

@ -19,7 +19,7 @@
<module>vertx</module>
<module>basic-http</module>
<module>restful-ws-spring-boot</module>
<module>amqp-proton</module>
</modules>
</project>