From b1303914bfa1ea833523740f8826bc41d062970e Mon Sep 17 00:00:00 2001 From: Fabrizio Lazzaretti Date: Thu, 3 Jul 2025 16:09:23 +0200 Subject: [PATCH] fix: nats by updating to the new async-nats the old nats crate is depricted Signed-off-by: Fabrizio Lazzaretti --- Cargo.toml | 2 +- example-projects/nats-example/Cargo.toml | 2 +- src/binding/nats/deserializer.rs | 32 ++++++++++++++---------- src/binding/nats/mod.rs | 32 ++++++++++++++---------- 4 files changed, 40 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 529bc74..dd3e881 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ http-0-2 = { version = "0.2", optional = true, package = "http"} axum-lib = { version = "^0.8", optional = true, package="axum"} http-body-util = {version = "^0.1", optional = true} poem-lib = { version = "^3.1", optional = true, package = "poem" } -nats-lib = { version = "0.25.0", optional = true, package = "nats" } +nats-lib = { version = "0.42.0", optional = true, package = "async-nats" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.4" diff --git a/example-projects/nats-example/Cargo.toml b/example-projects/nats-example/Cargo.toml index cdf4ed7..554b9c6 100644 --- a/example-projects/nats-example/Cargo.toml +++ b/example-projects/nats-example/Cargo.toml @@ -9,4 +9,4 @@ edition = "2021" [dependencies] cloudevents-sdk = { path = "../..", features = ["nats"] } serde_json = "^1.0" -nats = "^0.25" \ No newline at end of file +async-nats = "^0.42.0" \ No newline at end of file diff --git a/src/binding/nats/deserializer.rs b/src/binding/nats/deserializer.rs index bd4f71e..c4a5038 100644 --- a/src/binding/nats/deserializer.rs +++ b/src/binding/nats/deserializer.rs @@ -10,7 +10,7 @@ impl StructuredDeserializer for nats::Message { self, serializer: V, ) -> crate::message::Result { - serializer.set_structured_event(self.data.to_vec()) + serializer.set_structured_event(self.payload.to_vec()) } } @@ -47,12 +47,15 @@ mod tests { fn test_structured_deserialize_v10() { let expected = fixtures::v10::full_json_data_string_extension(); - let nats_message = nats::Message::new( - "not_relevant", - None, - json!(expected).to_string().as_bytes(), - None, - ); + let nats_message = nats::Message { + subject: "not_relevant".into(), + payload: json!(expected).to_string().into_bytes().into(), + reply: None, + headers: None, + status: None, + description: None, + length: 0, + }; let actual = nats_message.to_event().unwrap(); @@ -63,12 +66,15 @@ mod tests { fn test_structured_deserialize_v03() { let expected = fixtures::v03::full_json_data(); - let nats_message = nats::Message::new( - "not_relevant", - None, - json!(expected).to_string().as_bytes(), - None, - ); + let nats_message = nats::Message { + subject: "not_relevant".into(), + payload: json!(expected).to_string().into_bytes().into(), + reply: None, + headers: None, + status: None, + description: None, + length: 0, + }; let actual = nats_message.to_event().unwrap(); diff --git a/src/binding/nats/mod.rs b/src/binding/nats/mod.rs index fef603c..45dc0fd 100644 --- a/src/binding/nats/mod.rs +++ b/src/binding/nats/mod.rs @@ -1,29 +1,34 @@ //! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats) //! ## Examples //! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) -//! ``` +//! ```no_run //! use nats_lib as nats; //! use cloudevents::binding::nats::MessageExt; +//! use futures::StreamExt; //! -//! fn consume() { -//! let nc = nats::connect("localhost:4222").unwrap(); -//! let sub = nc.subscribe("test").unwrap(); -//! let nats_message = sub.next().unwrap(); -//! let cloud_event = nats_message.to_event().unwrap(); -//! -//! println!("{}", cloud_event.to_string()); +//! #[tokio::main] +//! async fn main() { +//! let nc = nats::connect("localhost:4222").await.unwrap(); +//! let mut sub = nc.subscribe("test").await.unwrap(); +//! +//! // Process messages one at a time +//! sub.for_each_concurrent(1, |nats_message| async move { +//! let cloud_event = nats_message.to_event().unwrap(); +//! println!("{}", cloud_event.to_string()); +//! }).await; //! } //! ``` //! //! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject -//! ``` +//! ```no_run //! use nats_lib as nats; -//! use cloudevents::binding::nats::NatsCloudEvent; +//! use cloudevents``::binding::nats::NatsCloudEvent; //! use cloudevents::{EventBuilder, EventBuilderV10, Event}; //! use serde_json::json; //! -//! fn publish() { -//! let nc = nats::connect("localhost:4222").unwrap(); +//! #[tokio::main] +//! async fn main() { +//! let nc = nats::connect("localhost:4222").await.unwrap(); //! //! let event = EventBuilderV10::new() //! .id("123".to_string()) @@ -33,7 +38,8 @@ //! .build() //! .unwrap(); //! -//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap(); +//! let nats_payload = NatsCloudEvent::from_event(event).unwrap(); +//! nc.publish("whatever.subject.you.like", nats_payload.payload.into()).await.unwrap(); //! } //! ``` mod deserializer;