fix: nats by updating to the new async-nats & GitHub action/cache (bump version) (#255)

* fix: GitHub action/cache (bump version)
  https://github.blog/changelog/2024-12-05-notice-of-upcoming-releases-and-breaking-changes-for-github-actions/#actions-cache-v1-v2-and-actions-toolkit-cache-package-closing-down
* fix: nats by updating to the new async-nats
   the old nats crate is depricted
* fix: nats example

---------

Signed-off-by: Fabrizio Lazzaretti <fabrizio@lazzaretti.me>
This commit is contained in:
Fabrizio Lazzaretti 2025-07-10 16:18:55 +02:00 committed by GitHub
parent fa0aadb31d
commit 6f02d238e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 58 additions and 41 deletions

View File

@ -34,7 +34,7 @@ jobs:
- run: sudo apt-get install -y musl musl-dev musl-tools cmake
if: matrix.target == 'x86_64-unknown-linux-musl'
# # Caching stuff
- uses: actions/cache@v2
- uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
@ -42,7 +42,7 @@ jobs:
~/.cargo/registry/cache/
~/.cargo/git/db/
key: ${{ runner.os }}-cargo-deps-${{ hashFiles('**/Cargo.toml') }}
- uses: actions/cache@v2
- uses: actions/cache@v4
with:
path: |
target/

View File

@ -58,7 +58,7 @@ http-0-2 = { version = "0.2", optional = true, package = "http"}
axum-lib = { version = "^0.8", optional = true, package="axum"}
http-body-util = {version = "^0.1", optional = true}
poem-lib = { version = "^3.1", optional = true, package = "poem" }
nats-lib = { version = "0.25.0", optional = true, package = "nats" }
nats-lib = { version = "0.42.0", optional = true, package = "async-nats" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.4"

View File

@ -9,4 +9,6 @@ edition = "2021"
[dependencies]
cloudevents-sdk = { path = "../..", features = ["nats"] }
serde_json = "^1.0"
nats = "^0.25"
async-nats = "^0.42.0"
tokio = { version = "^1.0", features = ["full"] }
futures = "^0.3"

View File

@ -1,15 +1,17 @@
use std::{error::Error, thread};
use std::error::Error;
use cloudevents::binding::nats::{MessageExt, NatsCloudEvent};
use cloudevents::{Event, EventBuilder, EventBuilderV10};
use serde_json::json;
use futures::StreamExt;
/// First spin up a nats server i.e.
/// ```bash
/// docker run -p 4222:4222 -ti nats:latest
/// ```
fn main() -> Result<(), Box<dyn Error>> {
let nc = nats::connect("localhost:4222").unwrap();
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let client = async_nats::connect("localhost:4222").await?;
let event = EventBuilderV10::new()
.id("123".to_string())
@ -21,26 +23,27 @@ fn main() -> Result<(), Box<dyn Error>> {
let n_msg = NatsCloudEvent::from_event(event).unwrap();
let sub = nc.subscribe("test").unwrap();
let mut sub = client.subscribe("test").await?;
let t = thread::spawn(move || -> Result<Event, String> {
match sub.next() {
Some(msg) => match msg.to_event() {
let receive_task = tokio::spawn(async move {
if let Some(msg) = sub.next().await {
match msg.to_event() {
Ok(evt) => Ok(evt),
Err(e) => Err(e.to_string()),
},
None => Err("Unsubed or disconnected".to_string()),
}
} else {
Err("No event received".to_string())
}
});
nc.publish("test", n_msg)?;
client.publish("test", n_msg.payload.into()).await?;
let maybe_event = t.join().unwrap();
let maybe_event = receive_task.await?;
if let Ok(evt) = maybe_event {
println!("{}", evt.to_string());
} else {
println!("{}", maybe_event.unwrap_err().to_string());
println!("{}", maybe_event.unwrap_err());
}
Ok(())

View File

@ -10,7 +10,7 @@ impl StructuredDeserializer for nats::Message {
self,
serializer: V,
) -> crate::message::Result<R> {
serializer.set_structured_event(self.data.to_vec())
serializer.set_structured_event(self.payload.to_vec())
}
}
@ -47,12 +47,15 @@ mod tests {
fn test_structured_deserialize_v10() {
let expected = fixtures::v10::full_json_data_string_extension();
let nats_message = nats::Message::new(
"not_relevant",
None,
json!(expected).to_string().as_bytes(),
None,
);
let nats_message = nats::Message {
subject: "not_relevant".into(),
payload: json!(expected).to_string().into_bytes().into(),
reply: None,
headers: None,
status: None,
description: None,
length: 0,
};
let actual = nats_message.to_event().unwrap();
@ -63,12 +66,15 @@ mod tests {
fn test_structured_deserialize_v03() {
let expected = fixtures::v03::full_json_data();
let nats_message = nats::Message::new(
"not_relevant",
None,
json!(expected).to_string().as_bytes(),
None,
);
let nats_message = nats::Message {
subject: "not_relevant".into(),
payload: json!(expected).to_string().into_bytes().into(),
reply: None,
headers: None,
status: None,
description: None,
length: 0,
};
let actual = nats_message.to_event().unwrap();

View File

@ -1,29 +1,34 @@
//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats)
//! ## Examples
//! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html)
//! ```
//! ```no_run
//! use nats_lib as nats;
//! use cloudevents::binding::nats::MessageExt;
//! use futures::StreamExt;
//!
//! fn consume() {
//! let nc = nats::connect("localhost:4222").unwrap();
//! let sub = nc.subscribe("test").unwrap();
//! let nats_message = sub.next().unwrap();
//! let cloud_event = nats_message.to_event().unwrap();
//! #[tokio::main]
//! async fn main() {
//! let nc = nats::connect("localhost:4222").await.unwrap();
//! let mut sub = nc.subscribe("test").await.unwrap();
//!
//! println!("{}", cloud_event.to_string());
//! // Process messages one at a time
//! sub.for_each_concurrent(1, |nats_message| async move {
//! let cloud_event = nats_message.to_event().unwrap();
//! println!("{}", cloud_event.to_string());
//! }).await;
//! }
//! ```
//!
//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject
//! ```
//! ```no_run
//! use nats_lib as nats;
//! use cloudevents::binding::nats::NatsCloudEvent;
//! use cloudevents::{EventBuilder, EventBuilderV10, Event};
//! use serde_json::json;
//!
//! fn publish() {
//! let nc = nats::connect("localhost:4222").unwrap();
//! #[tokio::main]
//! async fn main() {
//! let nc = nats::connect("localhost:4222").await.unwrap();
//!
//! let event = EventBuilderV10::new()
//! .id("123".to_string())
@ -33,7 +38,8 @@
//! .build()
//! .unwrap();
//!
//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap();
//! let nats_payload = NatsCloudEvent::from_event(event).unwrap();
//! nc.publish("whatever.subject.you.like", nats_payload.payload.into()).await.unwrap();
//! }
//! ```
mod deserializer;