fix: nats by updating to the new async-nats

the old nats crate is depricted

Signed-off-by: Fabrizio Lazzaretti <fabrizio@lazzaretti.me>
This commit is contained in:
Fabrizio Lazzaretti 2025-07-03 16:09:23 +02:00
parent fa0aadb31d
commit b1303914bf
No known key found for this signature in database
4 changed files with 40 additions and 28 deletions

View File

@ -58,7 +58,7 @@ http-0-2 = { version = "0.2", optional = true, package = "http"}
axum-lib = { version = "^0.8", optional = true, package="axum"} axum-lib = { version = "^0.8", optional = true, package="axum"}
http-body-util = {version = "^0.1", optional = true} http-body-util = {version = "^0.1", optional = true}
poem-lib = { version = "^3.1", optional = true, package = "poem" } poem-lib = { version = "^3.1", optional = true, package = "poem" }
nats-lib = { version = "0.25.0", optional = true, package = "nats" } nats-lib = { version = "0.42.0", optional = true, package = "async-nats" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies] [target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.4" hostname = "^0.4"

View File

@ -9,4 +9,4 @@ edition = "2021"
[dependencies] [dependencies]
cloudevents-sdk = { path = "../..", features = ["nats"] } cloudevents-sdk = { path = "../..", features = ["nats"] }
serde_json = "^1.0" serde_json = "^1.0"
nats = "^0.25" async-nats = "^0.42.0"

View File

@ -10,7 +10,7 @@ impl StructuredDeserializer for nats::Message {
self, self,
serializer: V, serializer: V,
) -> crate::message::Result<R> { ) -> crate::message::Result<R> {
serializer.set_structured_event(self.data.to_vec()) serializer.set_structured_event(self.payload.to_vec())
} }
} }
@ -47,12 +47,15 @@ mod tests {
fn test_structured_deserialize_v10() { fn test_structured_deserialize_v10() {
let expected = fixtures::v10::full_json_data_string_extension(); let expected = fixtures::v10::full_json_data_string_extension();
let nats_message = nats::Message::new( let nats_message = nats::Message {
"not_relevant", subject: "not_relevant".into(),
None, payload: json!(expected).to_string().into_bytes().into(),
json!(expected).to_string().as_bytes(), reply: None,
None, headers: None,
); status: None,
description: None,
length: 0,
};
let actual = nats_message.to_event().unwrap(); let actual = nats_message.to_event().unwrap();
@ -63,12 +66,15 @@ mod tests {
fn test_structured_deserialize_v03() { fn test_structured_deserialize_v03() {
let expected = fixtures::v03::full_json_data(); let expected = fixtures::v03::full_json_data();
let nats_message = nats::Message::new( let nats_message = nats::Message {
"not_relevant", subject: "not_relevant".into(),
None, payload: json!(expected).to_string().into_bytes().into(),
json!(expected).to_string().as_bytes(), reply: None,
None, headers: None,
); status: None,
description: None,
length: 0,
};
let actual = nats_message.to_event().unwrap(); let actual = nats_message.to_event().unwrap();

View File

@ -1,29 +1,34 @@
//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats) //! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats)
//! ## Examples //! ## Examples
//! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) //! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html)
//! ``` //! ```no_run
//! use nats_lib as nats; //! use nats_lib as nats;
//! use cloudevents::binding::nats::MessageExt; //! use cloudevents::binding::nats::MessageExt;
//! use futures::StreamExt;
//! //!
//! fn consume() { //! #[tokio::main]
//! let nc = nats::connect("localhost:4222").unwrap(); //! async fn main() {
//! let sub = nc.subscribe("test").unwrap(); //! let nc = nats::connect("localhost:4222").await.unwrap();
//! let nats_message = sub.next().unwrap(); //! let mut sub = nc.subscribe("test").await.unwrap();
//! let cloud_event = nats_message.to_event().unwrap(); //!
//! //! // Process messages one at a time
//! println!("{}", cloud_event.to_string()); //! sub.for_each_concurrent(1, |nats_message| async move {
//! let cloud_event = nats_message.to_event().unwrap();
//! println!("{}", cloud_event.to_string());
//! }).await;
//! } //! }
//! ``` //! ```
//! //!
//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject //! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject
//! ``` //! ```no_run
//! use nats_lib as nats; //! use nats_lib as nats;
//! use cloudevents::binding::nats::NatsCloudEvent; //! use cloudevents``::binding::nats::NatsCloudEvent;
//! use cloudevents::{EventBuilder, EventBuilderV10, Event}; //! use cloudevents::{EventBuilder, EventBuilderV10, Event};
//! use serde_json::json; //! use serde_json::json;
//! //!
//! fn publish() { //! #[tokio::main]
//! let nc = nats::connect("localhost:4222").unwrap(); //! async fn main() {
//! let nc = nats::connect("localhost:4222").await.unwrap();
//! //!
//! let event = EventBuilderV10::new() //! let event = EventBuilderV10::new()
//! .id("123".to_string()) //! .id("123".to_string())
@ -33,7 +38,8 @@
//! .build() //! .build()
//! .unwrap(); //! .unwrap();
//! //!
//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap(); //! let nats_payload = NatsCloudEvent::from_event(event).unwrap();
//! nc.publish("whatever.subject.you.like", nats_payload.payload.into()).await.unwrap();
//! } //! }
//! ``` //! ```
mod deserializer; mod deserializer;