diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index aa69096..f244fca 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -146,3 +146,10 @@ jobs: toolchain: ${{ matrix.toolchain }} args: --target ${{ matrix.target }} --manifest-path ./example-projects/poem-example/Cargo.toml + - uses: actions-rs/cargo@v1 + name: "Build nats-example" + if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable' + with: + command: build + toolchain: ${{ matrix.toolchain }} + args: --target ${{ matrix.target }} --manifest-path ./example-projects/nats-example/Cargo.toml diff --git a/.gitignore b/.gitignore index a6f3b3e..1693344 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ **/target .idea +.vscode +.DS_Store **/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 080446a..5a7b505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ rdkafka = ["rdkafka-lib", "bytes", "futures"] warp = ["warp-lib", "bytes", "http", "hyper"] axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] +nats = ["nats-lib"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -50,6 +51,7 @@ hyper = { version = "^0.14", optional = true } axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } +nats-lib = { version = "0.21.0", optional = true, package = "nats" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" diff --git a/README.md b/README.md index 3475def..4879d7f 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Note: This project is WIP under active development, hence all APIs are considere | JSON Event Format | ✓ | ✓ | | Kafka Protocol Binding | ✓ | ✓ | | MQTT Protocol Binding | ✕ | ✕ | -| NATS Protocol Binding | ✕ | ✕ | +| NATS Protocol Binding | ✓ | ✓ | | Web hook | ✕ | ✕ | ## Crate Structure @@ -30,6 +30,7 @@ enabled by a specific [feature flag]: * `warp`: Integration with [warp](https://github.com/seanmonstar/warp/). * `reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest). * `rdkafka`: Integration with [rdkafka](https://fede1024.github.io/rust-rdkafka). +* `nats`: Integration with [nats](https://github.com/nats-io/nats.rs) This crate is continuously tested to work with GNU libc, WASM and musl toolchains. @@ -64,6 +65,7 @@ Checkout the examples using our integrations to learn how to send and receive ev * [Reqwest/WASM Example](example-projects/reqwest-wasm-example) * [Kafka Example](example-projects/rdkafka-example) * [Warp Example](example-projects/warp-example) +* [NATS Example](example-projects/nats-example) ## Development & Contributing diff --git a/example-projects/nats-example/Cargo.toml b/example-projects/nats-example/Cargo.toml new file mode 100644 index 0000000..c038d77 --- /dev/null +++ b/example-projects/nats-example/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "nats-example" +version = "0.1.0" +authors = ["Jakub Noga "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cloudevents-sdk = { path = "../..", features = ["nats"] } +serde_json = "^1.0" +nats = "0.21.0" \ No newline at end of file diff --git a/example-projects/nats-example/src/main.rs b/example-projects/nats-example/src/main.rs new file mode 100644 index 0000000..3d2ca16 --- /dev/null +++ b/example-projects/nats-example/src/main.rs @@ -0,0 +1,47 @@ +use std::{error::Error, thread}; + +use cloudevents::binding::nats::{MessageExt, NatsCloudEvent}; +use cloudevents::{Event, EventBuilder, EventBuilderV10}; +use serde_json::json; + +/// First spin up a nats server i.e. +/// ```bash +/// docker run -p 4222:4222 -ti nats:latest +/// ``` +fn main() -> Result<(), Box> { + let nc = nats::connect("localhost:4222").unwrap(); + + let event = EventBuilderV10::new() + .id("123".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + let n_msg = NatsCloudEvent::from_event(event).unwrap(); + + let sub = nc.subscribe("test").unwrap(); + + let t = thread::spawn(move || -> Result { + match sub.next() { + Some(msg) => match msg.to_event() { + Ok(evt) => Ok(evt), + Err(e) => Err(e.to_string()), + }, + None => Err("Unsubed or disconnected".to_string()), + } + }); + + nc.publish("test", n_msg)?; + + let maybe_event = t.join().unwrap(); + + if let Ok(evt) = maybe_event { + println!("{}", evt.to_string()); + } else { + println!("{}", maybe_event.unwrap_err().to_string()); + } + + Ok(()) +} diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 6466a36..282008e 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -13,6 +13,8 @@ pub mod axum; feature = "poem" ))] pub mod http; +#[cfg(feature = "nats")] +pub mod nats; #[cfg(feature = "poem")] pub mod poem; #[cfg(feature = "rdkafka")] diff --git a/src/binding/nats/deserializer.rs b/src/binding/nats/deserializer.rs new file mode 100644 index 0000000..ec0e3b6 --- /dev/null +++ b/src/binding/nats/deserializer.rs @@ -0,0 +1,77 @@ +use crate::{ + message::{Result, StructuredDeserializer}, + Event, +}; + +use nats_lib as nats; + +impl StructuredDeserializer for nats::Message { + fn deserialize_structured>( + self, + serializer: V, + ) -> crate::message::Result { + serializer.set_structured_event(self.data.to_vec()) + } +} + +/// Trait implemented by [`nats::Message`] to enable convenient deserialization to [`Event`] +/// +/// Trait sealed +pub trait MessageExt: private::Sealed { + fn to_event(&self) -> Result; +} + +impl MessageExt for nats::Message { + fn to_event(&self) -> Result { + StructuredDeserializer::into_event(self.to_owned()) + } +} + +mod private { + use nats_lib as nats; + + // Sealing the MessageExt + pub trait Sealed {} + impl Sealed for nats::Message {} +} + +#[cfg(test)] +mod tests { + use crate::test::fixtures; + use nats_lib as nats; + use serde_json::json; + + use super::*; + + #[test] + fn test_structured_deserialize_v10() { + let expected = fixtures::v10::full_json_data_string_extension(); + + let nats_message = nats::Message::new( + "not_relevant", + None, + json!(expected.clone()).to_string().as_bytes().to_vec(), + None, + ); + + let actual = nats_message.to_event().unwrap(); + + assert_eq!(expected, actual) + } + + #[test] + fn test_structured_deserialize_v03() { + let expected = fixtures::v03::full_json_data(); + + let nats_message = nats::Message::new( + "not_relevant", + None, + json!(expected.clone()).to_string().as_bytes().to_vec(), + None, + ); + + let actual = nats_message.to_event().unwrap(); + + assert_eq!(expected, actual) + } +} diff --git a/src/binding/nats/mod.rs b/src/binding/nats/mod.rs new file mode 100644 index 0000000..fef603c --- /dev/null +++ b/src/binding/nats/mod.rs @@ -0,0 +1,43 @@ +//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats) +//! ## Examples +//! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) +//! ``` +//! use nats_lib as nats; +//! use cloudevents::binding::nats::MessageExt; +//! +//! fn consume() { +//! let nc = nats::connect("localhost:4222").unwrap(); +//! let sub = nc.subscribe("test").unwrap(); +//! let nats_message = sub.next().unwrap(); +//! let cloud_event = nats_message.to_event().unwrap(); +//! +//! println!("{}", cloud_event.to_string()); +//! } +//! ``` +//! +//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject +//! ``` +//! use nats_lib as nats; +//! use cloudevents::binding::nats::NatsCloudEvent; +//! use cloudevents::{EventBuilder, EventBuilderV10, Event}; +//! use serde_json::json; +//! +//! fn publish() { +//! let nc = nats::connect("localhost:4222").unwrap(); +//! +//! let event = EventBuilderV10::new() +//! .id("123".to_string()) +//! .ty("example.test") +//! .source("http://localhost/") +//! .data("application/json", json!({"hello": "world"})) +//! .build() +//! .unwrap(); +//! +//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap(); +//! } +//! ``` +mod deserializer; +mod serializer; + +pub use deserializer::MessageExt; +pub use serializer::NatsCloudEvent; diff --git a/src/binding/nats/serializer.rs b/src/binding/nats/serializer.rs new file mode 100644 index 0000000..ad99d6b --- /dev/null +++ b/src/binding/nats/serializer.rs @@ -0,0 +1,26 @@ +use crate::{ + message::{Error, Result}, + Event, +}; + +/// Helper struct containing text data bytes of JSON serialized [Event] +/// +/// Implements [`AsRef`] so it can be directly passed to [`nats::Connection`](https://docs.rs/nats/0.21.0/nats/struct.Connection.html) methods as payload. +pub struct NatsCloudEvent { + pub payload: Vec, +} + +impl AsRef<[u8]> for NatsCloudEvent { + fn as_ref(&self) -> &[u8] { + &self.payload.as_ref() + } +} + +impl NatsCloudEvent { + pub fn from_event(event: Event) -> Result { + match serde_json::to_vec(&event) { + Ok(payload) => Ok(Self { payload }), + Err(e) => Err(Error::SerdeJsonError { source: e }), + } + } +}