New feature: NATS bindings (#185)

* feat: add NATS protocol bindings

Signed-off-by: Jakub Noga <jakub.noga@softchameleon.io>

* chore: run cargo fix & fmt

Signed-off-by: Jakub Noga <jakub.noga@softchameleon.io>

* fix: issues with docs

Signed-off-by: Jakub Noga <jakub.noga@softchameleon.io>

* Apply suggestions from code review

Co-authored-by: Lazzaretti <fabrizio@lazzaretti.me>

Signed-off-by: Jakub Noga <jakub.noga@softchameleon.io>

* feat: apply suggestions from code review

Signed-off-by: Jakub Noga <jakub.noga@softchameleon.io>

* feat: add test for v0.3 deserialization

Signed-off-by: Jakub Noga <jakub.noga@softchameleon.io>

* chore: run cargo fmt

Signed-off-by: Fabrizio Lazzaretti <fabrizio@lazzaretti.me>

Co-authored-by: Jakub Noga <jakub.noga@softchameleon.io>
Co-authored-by: Lazzaretti <fabrizio@lazzaretti.me>
This commit is contained in:
Jakub Noga 2022-07-12 21:51:04 +02:00 committed by GitHub
parent 6848cb15bd
commit 8dde763c06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 221 additions and 1 deletions

View File

@ -146,3 +146,10 @@ jobs:
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/poem-example/Cargo.toml
- uses: actions-rs/cargo@v1
name: "Build nats-example"
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/nats-example/Cargo.toml

2
.gitignore vendored
View File

@ -1,4 +1,6 @@
**/target
.idea
.vscode
.DS_Store
**/Cargo.lock

View File

@ -24,6 +24,7 @@ rdkafka = ["rdkafka-lib", "bytes", "futures"]
warp = ["warp-lib", "bytes", "http", "hyper"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
nats = ["nats-lib"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
@ -50,6 +51,7 @@ hyper = { version = "^0.14", optional = true }
axum-lib = { version = "^0.5", optional = true , package="axum"}
http-body = { version = "^0.4", optional = true }
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"

View File

@ -15,7 +15,7 @@ Note: This project is WIP under active development, hence all APIs are considere
| JSON Event Format | ✓ | ✓ |
| Kafka Protocol Binding | ✓ | ✓ |
| MQTT Protocol Binding | ✕ | ✕ |
| NATS Protocol Binding | ✕ | ✕ |
| NATS Protocol Binding | ✓ | ✓ |
| Web hook | ✕ | ✕ |
## Crate Structure
@ -30,6 +30,7 @@ enabled by a specific [feature flag]:
* `warp`: Integration with [warp](https://github.com/seanmonstar/warp/).
* `reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest).
* `rdkafka`: Integration with [rdkafka](https://fede1024.github.io/rust-rdkafka).
* `nats`: Integration with [nats](https://github.com/nats-io/nats.rs)
This crate is continuously tested to work with GNU libc, WASM and musl
toolchains.
@ -64,6 +65,7 @@ Checkout the examples using our integrations to learn how to send and receive ev
* [Reqwest/WASM Example](example-projects/reqwest-wasm-example)
* [Kafka Example](example-projects/rdkafka-example)
* [Warp Example](example-projects/warp-example)
* [NATS Example](example-projects/nats-example)
## Development & Contributing

View File

@ -0,0 +1,12 @@
[package]
name = "nats-example"
version = "0.1.0"
authors = ["Jakub Noga <jakub.noga@softchameleon.io>"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
cloudevents-sdk = { path = "../..", features = ["nats"] }
serde_json = "^1.0"
nats = "0.21.0"

View File

@ -0,0 +1,47 @@
use std::{error::Error, thread};
use cloudevents::binding::nats::{MessageExt, NatsCloudEvent};
use cloudevents::{Event, EventBuilder, EventBuilderV10};
use serde_json::json;
/// First spin up a nats server i.e.
/// ```bash
/// docker run -p 4222:4222 -ti nats:latest
/// ```
fn main() -> Result<(), Box<dyn Error>> {
let nc = nats::connect("localhost:4222").unwrap();
let event = EventBuilderV10::new()
.id("123".to_string())
.ty("example.test")
.source("http://localhost/")
.data("application/json", json!({"hello": "world"}))
.build()
.unwrap();
let n_msg = NatsCloudEvent::from_event(event).unwrap();
let sub = nc.subscribe("test").unwrap();
let t = thread::spawn(move || -> Result<Event, String> {
match sub.next() {
Some(msg) => match msg.to_event() {
Ok(evt) => Ok(evt),
Err(e) => Err(e.to_string()),
},
None => Err("Unsubed or disconnected".to_string()),
}
});
nc.publish("test", n_msg)?;
let maybe_event = t.join().unwrap();
if let Ok(evt) = maybe_event {
println!("{}", evt.to_string());
} else {
println!("{}", maybe_event.unwrap_err().to_string());
}
Ok(())
}

View File

@ -13,6 +13,8 @@ pub mod axum;
feature = "poem"
))]
pub mod http;
#[cfg(feature = "nats")]
pub mod nats;
#[cfg(feature = "poem")]
pub mod poem;
#[cfg(feature = "rdkafka")]

View File

@ -0,0 +1,77 @@
use crate::{
message::{Result, StructuredDeserializer},
Event,
};
use nats_lib as nats;
impl StructuredDeserializer for nats::Message {
fn deserialize_structured<R: Sized, V: crate::message::StructuredSerializer<R>>(
self,
serializer: V,
) -> crate::message::Result<R> {
serializer.set_structured_event(self.data.to_vec())
}
}
/// Trait implemented by [`nats::Message`] to enable convenient deserialization to [`Event`]
///
/// Trait sealed <https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed>
pub trait MessageExt: private::Sealed {
fn to_event(&self) -> Result<Event>;
}
impl MessageExt for nats::Message {
fn to_event(&self) -> Result<Event> {
StructuredDeserializer::into_event(self.to_owned())
}
}
mod private {
use nats_lib as nats;
// Sealing the MessageExt
pub trait Sealed {}
impl Sealed for nats::Message {}
}
#[cfg(test)]
mod tests {
use crate::test::fixtures;
use nats_lib as nats;
use serde_json::json;
use super::*;
#[test]
fn test_structured_deserialize_v10() {
let expected = fixtures::v10::full_json_data_string_extension();
let nats_message = nats::Message::new(
"not_relevant",
None,
json!(expected.clone()).to_string().as_bytes().to_vec(),
None,
);
let actual = nats_message.to_event().unwrap();
assert_eq!(expected, actual)
}
#[test]
fn test_structured_deserialize_v03() {
let expected = fixtures::v03::full_json_data();
let nats_message = nats::Message::new(
"not_relevant",
None,
json!(expected.clone()).to_string().as_bytes().to_vec(),
None,
);
let actual = nats_message.to_event().unwrap();
assert_eq!(expected, actual)
}
}

43
src/binding/nats/mod.rs Normal file
View File

@ -0,0 +1,43 @@
//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats)
//! ## Examples
//! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html)
//! ```
//! use nats_lib as nats;
//! use cloudevents::binding::nats::MessageExt;
//!
//! fn consume() {
//! let nc = nats::connect("localhost:4222").unwrap();
//! let sub = nc.subscribe("test").unwrap();
//! let nats_message = sub.next().unwrap();
//! let cloud_event = nats_message.to_event().unwrap();
//!
//! println!("{}", cloud_event.to_string());
//! }
//! ```
//!
//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject
//! ```
//! use nats_lib as nats;
//! use cloudevents::binding::nats::NatsCloudEvent;
//! use cloudevents::{EventBuilder, EventBuilderV10, Event};
//! use serde_json::json;
//!
//! fn publish() {
//! let nc = nats::connect("localhost:4222").unwrap();
//!
//! let event = EventBuilderV10::new()
//! .id("123".to_string())
//! .ty("example.test")
//! .source("http://localhost/")
//! .data("application/json", json!({"hello": "world"}))
//! .build()
//! .unwrap();
//!
//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap();
//! }
//! ```
mod deserializer;
mod serializer;
pub use deserializer::MessageExt;
pub use serializer::NatsCloudEvent;

View File

@ -0,0 +1,26 @@
use crate::{
message::{Error, Result},
Event,
};
/// Helper struct containing text data bytes of JSON serialized [Event]
///
/// Implements [`AsRef`] so it can be directly passed to [`nats::Connection`](https://docs.rs/nats/0.21.0/nats/struct.Connection.html) methods as payload.
pub struct NatsCloudEvent {
pub payload: Vec<u8>,
}
impl AsRef<[u8]> for NatsCloudEvent {
fn as_ref(&self) -> &[u8] {
&self.payload.as_ref()
}
}
impl NatsCloudEvent {
pub fn from_event(event: Event) -> Result<Self> {
match serde_json::to_vec(&event) {
Ok(payload) => Ok(Self { payload }),
Err(e) => Err(Error::SerdeJsonError { source: e }),
}
}
}