From a765e9a856189150de296b9ca3f06f161c6673c4 Mon Sep 17 00:00:00 2001 From: Julien Teruel Date: Wed, 21 Jun 2023 14:46:21 +0200 Subject: [PATCH] feat: add support for async_nats Signed-off-by: Julien Teruel --- Cargo.toml | 5 +- src/binding/async_nats/deserializer.rs | 94 ++++++++++++++++++++++++++ src/binding/async_nats/mod.rs | 44 ++++++++++++ src/binding/async_nats/serializer.rs | 26 +++++++ src/binding/mod.rs | 3 + 5 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 src/binding/async_nats/deserializer.rs create mode 100644 src/binding/async_nats/mod.rs create mode 100644 src/binding/async_nats/serializer.rs diff --git a/Cargo.toml b/Cargo.toml index 29f150b..ea9ddc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"] axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] nats = ["nats-lib"] +async-nats = ["async-nats-lib"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -56,7 +57,8 @@ hyper = { version = "^0.14", optional = true } axum-lib = { version = "^0.6", optional = true, package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } -nats-lib = { version = "0.21.0", optional = true, package = "nats" } +nats-lib = { version = "0.24.0", optional = true, package = "nats" } +async-nats-lib = { version = "0.31.0", optional = true, package = "async-nats" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -73,6 +75,7 @@ rmp-serde = "1" # runtime dev-deps actix-rt = { version = "^2" } +bytes = "^1.5" url = { version = "^2.1", features = ["serde"] } serde_json = { version = "^1.0" } chrono = { version = "^0.4", features = ["serde"] } diff --git a/src/binding/async_nats/deserializer.rs b/src/binding/async_nats/deserializer.rs new file mode 100644 index 0000000..33a23ca --- /dev/null +++ b/src/binding/async_nats/deserializer.rs @@ -0,0 +1,94 @@ +use crate::{ + message::{Result, StructuredDeserializer}, + Event, +}; + +use async_nats_lib as async_nats; + +impl StructuredDeserializer for async_nats::Message { + fn deserialize_structured>( + self, + serializer: V, + ) -> crate::message::Result { + serializer.set_structured_event(self.payload.to_vec()) + } +} + +/// Trait implemented by [`async_nats::Message`] to enable convenient deserialization to [`Event`] +/// +/// Trait sealed +pub trait MessageExt: private::Sealed { + fn to_event(&self) -> Result; +} + +impl MessageExt for async_nats::Message { + fn to_event(&self) -> Result { + let message = async_nats::Message { + subject: self.subject.clone(), + reply: self.reply.clone(), + payload: self.payload.clone(), + headers: self.headers.clone(), + status: self.status.clone(), + description: self.description.clone(), + length: self.length, + }; + StructuredDeserializer::into_event(message) + } +} + +mod private { + use async_nats_lib as nats; + + // Sealing the MessageExt + pub trait Sealed {} + impl Sealed for nats::Message {} +} + +#[cfg(test)] +mod tests { + use crate::test::fixtures; + use async_nats_lib as async_nats; + use bytes::Bytes; + use serde_json::json; + use MessageExt; + + use super::*; + + #[test] + fn test_structured_deserialize_v10() { + let expected = fixtures::v10::full_json_data_string_extension(); + + let nats_message = async_nats::Message { + subject: "not_relevant".to_string(), + reply: None, + payload: Bytes::from(json!(expected).to_string()), + headers: None, + status: None, + description: None, + length: 0, + }; + + let actual = nats_message.to_event().unwrap(); + + assert_eq!(expected, actual) + } + + #[test] + fn test_structured_deserialize_v03() { + let expected = fixtures::v03::full_json_data(); + + let nats_message = async_nats::Message { + subject: "not_relevant".to_string(), + reply: None, + payload: Bytes::from(json!(expected).to_string()), + headers: None, + status: None, + description: None, + length: 0, + }; + + let actual = nats_message.to_event().unwrap(); + + assert_eq!(expected, actual) + } +} diff --git a/src/binding/async_nats/mod.rs b/src/binding/async_nats/mod.rs new file mode 100644 index 0000000..c0e3741 --- /dev/null +++ b/src/binding/async_nats/mod.rs @@ -0,0 +1,44 @@ +//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [async-nats](https://docs.rs/async-nats) +//! ## Examples +//! Deserialize [async_nats::Message](https://docs.rs/async-nats/latest/async_nats/message/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) +//! ``` +//! use async_nats_lib as async_nats; +//! use futures::StreamExt; +//! use cloudevents::binding::async_nats::MessageExt; +//! +//! async fn consume() { +//! let nc = async_nats::connect("localhost:4222").await.unwrap(); +//! let mut sub = nc.subscribe("test".to_string()).await.unwrap(); +//! let nats_message = sub.next().await.unwrap(); +//! let cloud_event = nats_message.to_event().unwrap(); +//! +//! println!("{}", cloud_event.to_string()); +//! } +//! ``` +//! +//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject +//! ``` +//! use async_nats_lib as async_nats; +//! use cloudevents::binding::nats::NatsCloudEvent; +//! use cloudevents::{EventBuilder, EventBuilderV10, Event}; +//! use serde_json::json; +//! +//! async fn publish() { +//! let nc = async_nats::connect("localhost:4222").await.unwrap(); +//! +//! let event = EventBuilderV10::new() +//! .id("123".to_string()) +//! .ty("example.test") +//! .source("http://localhost/") +//! .data("application/json", json!({"hello": "world"})) +//! .build() +//! .unwrap(); +//! +//! nc.publish("whatever.subject.you.like".to_string(), NatsCloudEvent::from_event(event).unwrap().payload.into()).await.unwrap(); +//! } +//! ``` +mod deserializer; +mod serializer; + +pub use deserializer::MessageExt; +pub use serializer::NatsCloudEvent; diff --git a/src/binding/async_nats/serializer.rs b/src/binding/async_nats/serializer.rs new file mode 100644 index 0000000..f987aa6 --- /dev/null +++ b/src/binding/async_nats/serializer.rs @@ -0,0 +1,26 @@ +use crate::{ + message::{Error, Result}, + Event, +}; + +/// Helper struct containing text data bytes of JSON serialized [Event] +/// +/// Implements [`AsRef`] so it can be directly passed to [`nats::Connection`](https://docs.rs/nats/0.21.0/nats/struct.Connection.html) methods as payload. +pub struct NatsCloudEvent { + pub payload: Vec, +} + +impl AsRef<[u8]> for NatsCloudEvent { + fn as_ref(&self) -> &[u8] { + self.payload.as_ref() + } +} + +impl NatsCloudEvent { + pub fn from_event(event: Event) -> Result { + match serde_json::to_vec(&event) { + Ok(payload) => Ok(Self { payload }), + Err(e) => Err(Error::SerdeJsonError { source: e }), + } + } +} diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 1333212..e3aeffb 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -7,6 +7,9 @@ pub mod actix; #[cfg(feature = "axum")] pub mod axum; +#[cfg_attr(docsrs, doc(cfg(feature = "async-nats")))] +#[cfg(feature = "async-nats")] +pub mod async_nats; #[cfg_attr( docsrs, doc(cfg(any(