diff --git a/examples/amqp-proton/README.md b/examples/amqp-proton/README.md
new file mode 100644
index 00000000..b25555cd
--- /dev/null
+++ b/examples/amqp-proton/README.md
@@ -0,0 +1,32 @@
+# AMQP 1.0 + CloudEvents sample
+This example uses the vertx-proton library and the AMQP 1.0 protocol binding for cloud events to implement a client and server that communicates over AMQP to exchange cloud event messages.
+
+The vertx-proton library makes it easy to create (reactive) AMQP clients and servers and is a wrapper around Qpid proton--a library used to implement the AMQP 1.0 protocol binding for cloud events.
+
+## Build
+
+```shell
+mvn package
+```
+
+## Start AMQP Server
+Starts the AMQP server on `127.0.0.1` to listen for incoming connections on the default (insecure) AMQP port `5672`. Once the server is started, it can either receive or send messages to a connected client. The opening and closing of a connection is initiated by a client.
+
+```shell
+mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpServer"
+```
+
+## Start AMQP Client
+The client simply opens a connection with the server and either sends or receives a message containing a cloud event. Once the client completes sending or receiving a message, it initiates the closing of the connection by emitting the `AMQP CLOSE frame`.
+
+Send a message to the server.
+
+```shell
+mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="send"
+```
+
+Receive a message from the server.
+
+```shell
+mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="receive"
+```
diff --git a/examples/amqp-proton/pom.xml b/examples/amqp-proton/pom.xml
new file mode 100644
index 00000000..d0e3ff8c
--- /dev/null
+++ b/examples/amqp-proton/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ cloudevents-examples
+ io.cloudevents
+ 2.0.0-SNAPSHOT
+
+ 4.0.0
+
+ cloudevents-amqp-proton-example
+
+
+ 4.0.0.Beta1
+
+
+
+ io.cloudevents
+ cloudevents-amqp-proton
+ ${project.version}
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+
+
+ io.vertx
+ vertx-proton
+ ${vertx.version}
+
+
+
+
+
diff --git a/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java
new file mode 100644
index 00000000..14710892
--- /dev/null
+++ b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpClient.java
@@ -0,0 +1,162 @@
+package io.cloudevents.examples.amqp.vertx;
+
+import java.io.PrintWriter;
+
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.message.Message;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.amqp.ProtonAmqpMessageFactory;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.v1.CloudEventBuilder;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.proton.ProtonClient;
+import io.vertx.proton.ProtonClientOptions;
+import io.vertx.proton.ProtonConnection;
+import io.vertx.proton.ProtonMessageHandler;
+import io.vertx.proton.ProtonQoS;
+import io.vertx.proton.ProtonReceiver;
+import io.vertx.proton.ProtonSender;
+
+/**
+ * A example vertx-based AMQP client that interacts with a remote AMQP server to send and receive CloudEvent messages.
+ */
+public class AmqpClient {
+
+ private static ProtonConnection connection;
+
+ private static final String SERVER_HOST = "127.0.0.1";
+ private static final int SERVER_PORT = 5672;
+ private static final String SEND_MESSAGE = "send";
+ private static final String RECEIVE_MESSAGE = "receive";
+
+ final static Vertx VERTX = Vertx.vertx();
+ private static PrintWriter writer = new PrintWriter(System.out, true);
+
+ public static void main(String args[]) {
+
+ if (args.length < 1) {
+ writer.println("Usage: AmqpClient [send|receive]");
+ return;
+ }
+
+ final String action = args[0].toLowerCase();
+
+ switch (action) {
+
+ case SEND_MESSAGE:
+ sendMessage();
+ break;
+
+ case RECEIVE_MESSAGE:
+ receiveMessage();
+ break;
+
+ default:
+ writer.println("Unknown action");
+ }
+ }
+
+ private static void sendMessage() {
+ connectToServer(SERVER_HOST, SERVER_PORT)
+ .compose(conn -> {
+ connection = conn;
+ writer.printf("[Client] Connected to %s:%s", SERVER_HOST, SERVER_PORT);
+
+ return openSenderLink();
+ }).onSuccess(sender -> {
+
+ final JsonObject payload = new JsonObject().put("temp", 50);
+
+ final CloudEvent event = new CloudEventBuilder()
+ .withAttribute(CloudEventV1.ID, "client-id")
+ .withAttribute(CloudEventV1.SOURCE, "http://127.0.0.1/amqp-client")
+ .withAttribute(CloudEventV1.TYPE, "com.example.sampletype1")
+ .withAttribute(CloudEventV1.TIME, "2020-11-06T21:47:12.037467+00:00")
+ .withData(payload.toString().getBytes())
+ .build();
+
+ final Message message = ProtonAmqpMessageFactory.createWriter().writeBinary(event);
+ message.setAddress("/telemetry");
+ sender.send(message, delivery -> {
+ if (Accepted.class.isInstance(delivery.getRemoteState())) {
+ writer.println("[Client:] message delivered and accepted by remote peer");
+ }
+ connection.close();
+ });
+ }).onFailure(t -> {
+ writer.printf("[Client] Connection failed (%s)", t.getCause().getMessage());
+ });
+
+ }
+
+ private static void receiveMessage() {
+ connectToServer(SERVER_HOST, SERVER_PORT)
+ .compose(conn -> {
+ connection = conn;
+ writer.println("[Client] Connected");
+ return Future.succeededFuture();
+ }).onSuccess(success ->
+ openReceiverLink((delivery, message) -> {
+ final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
+ final CloudEvent event = reader.toEvent();
+ writer.printf("[Client] received CloudEvent[Id=%s, Source=%s]", event.getId(),
+ event.getSource().toString());
+ connection.close();
+ })
+ ).onFailure(t -> {
+ writer.println("[Client] Connection failed");
+ });
+ }
+
+ private static Future connectToServer(final String host, final int port) {
+
+ final Promise connectAttempt = Promise.promise();
+ final ProtonClientOptions options = new ProtonClientOptions();
+ final ProtonClient client = ProtonClient.create(VERTX);
+
+ client.connect(options, host, port, connectAttempt);
+
+ return connectAttempt.future()
+ .compose(unopenedConnection -> {
+ final Promise con = Promise.promise();
+ unopenedConnection.openHandler(con);
+ unopenedConnection.open();
+ return con.future();
+ });
+ }
+
+ private static Future openSenderLink() {
+ if (connection == null || connection.isDisconnected()) {
+ throw new IllegalStateException("[Client] connection not established");
+ }
+
+ final Promise result = Promise.promise();
+ final ProtonSender sender = connection.createSender(null);
+ sender.openHandler(result);
+ sender.open();
+ return result.future();
+ }
+
+ private static Future openReceiverLink(final ProtonMessageHandler msgHandler) {
+ if (connection == null || connection.isDisconnected()) {
+ throw new IllegalStateException("[Client] connection not established");
+ }
+
+ final Promise result = Promise.promise();
+ final ProtonReceiver receiver = connection.createReceiver(null);
+ receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
+ receiver.handler(msgHandler);
+ receiver.openHandler(result);
+ receiver.open();
+ return result.future().map(recver -> {
+ // Ready to receive messages
+ return recver;
+ });
+ }
+
+}
diff --git a/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java
new file mode 100644
index 00000000..0eb23070
--- /dev/null
+++ b/examples/amqp-proton/src/main/java/io/cloudevents/examples/amqp/vertx/AmqpServer.java
@@ -0,0 +1,109 @@
+package io.cloudevents.examples.amqp.vertx;
+
+import java.io.PrintWriter;
+import java.net.URI;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.proton.message.Message;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.amqp.ProtonAmqpMessageFactory;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.MessageWriter;
+import io.vertx.core.Vertx;
+import io.vertx.proton.ProtonConnection;
+import io.vertx.proton.ProtonServer;
+import io.vertx.proton.ProtonServerOptions;
+
+/**
+ * An example vertx-based AMQP server that receives and sends CloudEvent messages to/from a remote client.
+ */
+public class AmqpServer {
+
+ private static final String DEFAULT_HOST = "127.0.0.1";
+ private static final int DEFAULT_PORT = 5672;
+
+ private static PrintWriter writer = new PrintWriter(System.out, true);
+
+ public static void main(String argv[]) {
+ final Vertx vertx = Vertx.vertx();
+
+ final List args = new ArrayList<>();
+ if (argv.length > 1) {
+ args.addAll(Arrays.asList(argv[0].split(":", 1)));
+ }
+
+ final String host = args.isEmpty() ? DEFAULT_HOST : args.get(0);
+ final int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.get(1));
+
+ final ProtonServerOptions options = new ProtonServerOptions();
+ options.setHost(host);
+ options.setPort(port);
+
+ final ProtonServer server = ProtonServer.create(vertx, options);
+ server.connectHandler(con -> onConnectRequest(con)).listen(ar -> {
+ if (ar.succeeded()) {
+ writer.printf("[Server] started and listening on %s:%s\n", host, port);
+ } else {
+ writer.printf("[Server] failed to start (%s)\n", ar.cause());
+ System.exit(1);
+ }
+ });
+ }
+
+ private static void onConnectRequest(final ProtonConnection con) {
+ // BEGIN frame received
+ con.sessionOpenHandler(remoteSession -> {
+ remoteSession.open();
+ });
+ // ATTACH frame received -> client wants to send messages to this server.
+ con.receiverOpenHandler(remoteReceiver -> {
+ remoteReceiver.handler((delivery, message) -> {
+ // message received -> convert to CloudEvent
+ final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
+ final CloudEvent event = reader.toEvent();
+
+ writer.printf("[Server] received CloudEvent[Id=%s, Source=%s]\n", event.getId(),
+ event.getSource().toString());
+
+ }).open();
+ });
+ // ATTACH frame received -> client wants to receive messages from this server.
+ con.senderOpenHandler(sender -> {
+ try {
+ MessageWriter, Message> writer = ProtonAmqpMessageFactory.createWriter();
+ final CloudEvent event = CloudEventBuilder.v1()
+ .withId("amqp-server-id")
+ .withType("com.example.sampletype1")
+ .withSource(URI.create("http://127.0.0.1/amqp-server"))
+ .withTime(OffsetDateTime.now())
+ .withData("{\"temp\": 5}".getBytes())
+ .build();
+
+ final Message message = writer.writeBinary(event);
+ sender.send(message);
+
+ } catch (final Exception e) {
+ writer.println("[Server] failed to send ");
+ }
+ sender.open();
+ });
+ //OPEN frame received
+ con.openHandler(remoteOpen -> {
+ if (remoteOpen.failed()) {
+ // connection with client failed.
+ writer.println(remoteOpen.cause());
+ } else {
+ remoteOpen.result().open();
+ }
+ });
+ // CLOSE Frame received
+ con.closeHandler(remoteClose -> {
+ con.close();
+ });
+ }
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index 2384b5e4..ba885925 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -19,7 +19,7 @@
vertx
basic-http
restful-ws-spring-boot
+ amqp-proton
-