feat: add support for async_nats

Signed-off-by: Julien Teruel <julien.teruel@gmail.com>
This commit is contained in:
Julien Teruel 2023-06-21 14:46:21 +02:00
parent 4f265cd142
commit a765e9a856
5 changed files with 171 additions and 1 deletions

View File

@ -30,6 +30,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
nats = ["nats-lib"]
async-nats = ["async-nats-lib"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
@ -56,7 +57,8 @@ hyper = { version = "^0.14", optional = true }
axum-lib = { version = "^0.6", optional = true, package="axum"}
http-body = { version = "^0.4", optional = true }
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
nats-lib = { version = "0.24.0", optional = true, package = "nats" }
async-nats-lib = { version = "0.31.0", optional = true, package = "async-nats" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"
@ -73,6 +75,7 @@ rmp-serde = "1"
# runtime dev-deps
actix-rt = { version = "^2" }
bytes = "^1.5"
url = { version = "^2.1", features = ["serde"] }
serde_json = { version = "^1.0" }
chrono = { version = "^0.4", features = ["serde"] }

View File

@ -0,0 +1,94 @@
use crate::{
message::{Result, StructuredDeserializer},
Event,
};
use async_nats_lib as async_nats;
impl StructuredDeserializer for async_nats::Message {
fn deserialize_structured<R: Sized, V: crate::message::StructuredSerializer<R>>(
self,
serializer: V,
) -> crate::message::Result<R> {
serializer.set_structured_event(self.payload.to_vec())
}
}
/// Trait implemented by [`async_nats::Message`] to enable convenient deserialization to [`Event`]
///
/// Trait sealed <https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed>
pub trait MessageExt: private::Sealed {
fn to_event(&self) -> Result<Event>;
}
impl MessageExt for async_nats::Message {
fn to_event(&self) -> Result<Event> {
let message = async_nats::Message {
subject: self.subject.clone(),
reply: self.reply.clone(),
payload: self.payload.clone(),
headers: self.headers.clone(),
status: self.status.clone(),
description: self.description.clone(),
length: self.length,
};
StructuredDeserializer::into_event(message)
}
}
mod private {
use async_nats_lib as nats;
// Sealing the MessageExt
pub trait Sealed {}
impl Sealed for nats::Message {}
}
#[cfg(test)]
mod tests {
use crate::test::fixtures;
use async_nats_lib as async_nats;
use bytes::Bytes;
use serde_json::json;
use MessageExt;
use super::*;
#[test]
fn test_structured_deserialize_v10() {
let expected = fixtures::v10::full_json_data_string_extension();
let nats_message = async_nats::Message {
subject: "not_relevant".to_string(),
reply: None,
payload: Bytes::from(json!(expected).to_string()),
headers: None,
status: None,
description: None,
length: 0,
};
let actual = nats_message.to_event().unwrap();
assert_eq!(expected, actual)
}
#[test]
fn test_structured_deserialize_v03() {
let expected = fixtures::v03::full_json_data();
let nats_message = async_nats::Message {
subject: "not_relevant".to_string(),
reply: None,
payload: Bytes::from(json!(expected).to_string()),
headers: None,
status: None,
description: None,
length: 0,
};
let actual = nats_message.to_event().unwrap();
assert_eq!(expected, actual)
}
}

View File

@ -0,0 +1,44 @@
//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [async-nats](https://docs.rs/async-nats)
//! ## Examples
//! Deserialize [async_nats::Message](https://docs.rs/async-nats/latest/async_nats/message/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html)
//! ```
//! use async_nats_lib as async_nats;
//! use futures::StreamExt;
//! use cloudevents::binding::async_nats::MessageExt;
//!
//! async fn consume() {
//! let nc = async_nats::connect("localhost:4222").await.unwrap();
//! let mut sub = nc.subscribe("test".to_string()).await.unwrap();
//! let nats_message = sub.next().await.unwrap();
//! let cloud_event = nats_message.to_event().unwrap();
//!
//! println!("{}", cloud_event.to_string());
//! }
//! ```
//!
//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject
//! ```
//! use async_nats_lib as async_nats;
//! use cloudevents::binding::nats::NatsCloudEvent;
//! use cloudevents::{EventBuilder, EventBuilderV10, Event};
//! use serde_json::json;
//!
//! async fn publish() {
//! let nc = async_nats::connect("localhost:4222").await.unwrap();
//!
//! let event = EventBuilderV10::new()
//! .id("123".to_string())
//! .ty("example.test")
//! .source("http://localhost/")
//! .data("application/json", json!({"hello": "world"}))
//! .build()
//! .unwrap();
//!
//! nc.publish("whatever.subject.you.like".to_string(), NatsCloudEvent::from_event(event).unwrap().payload.into()).await.unwrap();
//! }
//! ```
mod deserializer;
mod serializer;
pub use deserializer::MessageExt;
pub use serializer::NatsCloudEvent;

View File

@ -0,0 +1,26 @@
use crate::{
message::{Error, Result},
Event,
};
/// Helper struct containing text data bytes of JSON serialized [Event]
///
/// Implements [`AsRef`] so it can be directly passed to [`nats::Connection`](https://docs.rs/nats/0.21.0/nats/struct.Connection.html) methods as payload.
pub struct NatsCloudEvent {
pub payload: Vec<u8>,
}
impl AsRef<[u8]> for NatsCloudEvent {
fn as_ref(&self) -> &[u8] {
self.payload.as_ref()
}
}
impl NatsCloudEvent {
pub fn from_event(event: Event) -> Result<Self> {
match serde_json::to_vec(&event) {
Ok(payload) => Ok(Self { payload }),
Err(e) => Err(Error::SerdeJsonError { source: e }),
}
}
}

View File

@ -7,6 +7,9 @@ pub mod actix;
#[cfg(feature = "axum")]
pub mod axum;
#[cfg_attr(docsrs, doc(cfg(feature = "async-nats")))]
#[cfg(feature = "async-nats")]
pub mod async_nats;
#[cfg_attr(
docsrs,
doc(cfg(any(