diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index a7a2f83..481c137 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -34,7 +34,7 @@ jobs: - run: sudo apt-get install -y musl musl-dev musl-tools cmake if: matrix.target == 'x86_64-unknown-linux-musl' # # Caching stuff - - uses: actions/cache@v2 + - uses: actions/cache@v4 with: path: | ~/.cargo/bin/ @@ -42,7 +42,7 @@ jobs: ~/.cargo/registry/cache/ ~/.cargo/git/db/ key: ${{ runner.os }}-cargo-deps-${{ hashFiles('**/Cargo.toml') }} - - uses: actions/cache@v2 + - uses: actions/cache@v4 with: path: | target/ diff --git a/Cargo.toml b/Cargo.toml index 529bc74..dd3e881 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ http-0-2 = { version = "0.2", optional = true, package = "http"} axum-lib = { version = "^0.8", optional = true, package="axum"} http-body-util = {version = "^0.1", optional = true} poem-lib = { version = "^3.1", optional = true, package = "poem" } -nats-lib = { version = "0.25.0", optional = true, package = "nats" } +nats-lib = { version = "0.42.0", optional = true, package = "async-nats" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.4" diff --git a/example-projects/nats-example/Cargo.toml b/example-projects/nats-example/Cargo.toml index cdf4ed7..2f00f78 100644 --- a/example-projects/nats-example/Cargo.toml +++ b/example-projects/nats-example/Cargo.toml @@ -9,4 +9,6 @@ edition = "2021" [dependencies] cloudevents-sdk = { path = "../..", features = ["nats"] } serde_json = "^1.0" -nats = "^0.25" \ No newline at end of file +async-nats = "^0.42.0" +tokio = { version = "^1.0", features = ["full"] } +futures = "^0.3" \ No newline at end of file diff --git a/example-projects/nats-example/src/main.rs b/example-projects/nats-example/src/main.rs index 3d2ca16..7a5ea04 100644 --- a/example-projects/nats-example/src/main.rs +++ b/example-projects/nats-example/src/main.rs @@ -1,15 +1,17 @@ -use std::{error::Error, thread}; +use std::error::Error; use cloudevents::binding::nats::{MessageExt, NatsCloudEvent}; use cloudevents::{Event, EventBuilder, EventBuilderV10}; use serde_json::json; +use futures::StreamExt; /// First spin up a nats server i.e. /// ```bash /// docker run -p 4222:4222 -ti nats:latest /// ``` -fn main() -> Result<(), Box> { - let nc = nats::connect("localhost:4222").unwrap(); +#[tokio::main] +async fn main() -> Result<(), Box> { + let client = async_nats::connect("localhost:4222").await?; let event = EventBuilderV10::new() .id("123".to_string()) @@ -21,26 +23,27 @@ fn main() -> Result<(), Box> { let n_msg = NatsCloudEvent::from_event(event).unwrap(); - let sub = nc.subscribe("test").unwrap(); + let mut sub = client.subscribe("test").await?; - let t = thread::spawn(move || -> Result { - match sub.next() { - Some(msg) => match msg.to_event() { + let receive_task = tokio::spawn(async move { + if let Some(msg) = sub.next().await { + match msg.to_event() { Ok(evt) => Ok(evt), Err(e) => Err(e.to_string()), - }, - None => Err("Unsubed or disconnected".to_string()), + } + } else { + Err("No event received".to_string()) } }); - nc.publish("test", n_msg)?; + client.publish("test", n_msg.payload.into()).await?; - let maybe_event = t.join().unwrap(); + let maybe_event = receive_task.await?; if let Ok(evt) = maybe_event { println!("{}", evt.to_string()); } else { - println!("{}", maybe_event.unwrap_err().to_string()); + println!("{}", maybe_event.unwrap_err()); } Ok(()) diff --git a/src/binding/nats/deserializer.rs b/src/binding/nats/deserializer.rs index bd4f71e..c4a5038 100644 --- a/src/binding/nats/deserializer.rs +++ b/src/binding/nats/deserializer.rs @@ -10,7 +10,7 @@ impl StructuredDeserializer for nats::Message { self, serializer: V, ) -> crate::message::Result { - serializer.set_structured_event(self.data.to_vec()) + serializer.set_structured_event(self.payload.to_vec()) } } @@ -47,12 +47,15 @@ mod tests { fn test_structured_deserialize_v10() { let expected = fixtures::v10::full_json_data_string_extension(); - let nats_message = nats::Message::new( - "not_relevant", - None, - json!(expected).to_string().as_bytes(), - None, - ); + let nats_message = nats::Message { + subject: "not_relevant".into(), + payload: json!(expected).to_string().into_bytes().into(), + reply: None, + headers: None, + status: None, + description: None, + length: 0, + }; let actual = nats_message.to_event().unwrap(); @@ -63,12 +66,15 @@ mod tests { fn test_structured_deserialize_v03() { let expected = fixtures::v03::full_json_data(); - let nats_message = nats::Message::new( - "not_relevant", - None, - json!(expected).to_string().as_bytes(), - None, - ); + let nats_message = nats::Message { + subject: "not_relevant".into(), + payload: json!(expected).to_string().into_bytes().into(), + reply: None, + headers: None, + status: None, + description: None, + length: 0, + }; let actual = nats_message.to_event().unwrap(); diff --git a/src/binding/nats/mod.rs b/src/binding/nats/mod.rs index fef603c..fa7d32e 100644 --- a/src/binding/nats/mod.rs +++ b/src/binding/nats/mod.rs @@ -1,29 +1,34 @@ //! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats) //! ## Examples //! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) -//! ``` +//! ```no_run //! use nats_lib as nats; //! use cloudevents::binding::nats::MessageExt; +//! use futures::StreamExt; //! -//! fn consume() { -//! let nc = nats::connect("localhost:4222").unwrap(); -//! let sub = nc.subscribe("test").unwrap(); -//! let nats_message = sub.next().unwrap(); -//! let cloud_event = nats_message.to_event().unwrap(); -//! -//! println!("{}", cloud_event.to_string()); +//! #[tokio::main] +//! async fn main() { +//! let nc = nats::connect("localhost:4222").await.unwrap(); +//! let mut sub = nc.subscribe("test").await.unwrap(); +//! +//! // Process messages one at a time +//! sub.for_each_concurrent(1, |nats_message| async move { +//! let cloud_event = nats_message.to_event().unwrap(); +//! println!("{}", cloud_event.to_string()); +//! }).await; //! } //! ``` //! //! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject -//! ``` +//! ```no_run //! use nats_lib as nats; //! use cloudevents::binding::nats::NatsCloudEvent; //! use cloudevents::{EventBuilder, EventBuilderV10, Event}; //! use serde_json::json; //! -//! fn publish() { -//! let nc = nats::connect("localhost:4222").unwrap(); +//! #[tokio::main] +//! async fn main() { +//! let nc = nats::connect("localhost:4222").await.unwrap(); //! //! let event = EventBuilderV10::new() //! .id("123".to_string()) @@ -33,7 +38,8 @@ //! .build() //! .unwrap(); //! -//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap(); +//! let nats_payload = NatsCloudEvent::from_event(event).unwrap(); +//! nc.publish("whatever.subject.you.like", nats_payload.payload.into()).await.unwrap(); //! } //! ``` mod deserializer;