diff --git a/Cargo.toml b/Cargo.toml index 3ce7dbd..df99710 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,8 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"] reqwest = ["reqwest-lib", "async-trait", "bytes", "http"] rdkafka = ["rdkafka-lib", "bytes", "futures"] warp = ["warp-lib", "bytes", "http", "hyper"] -axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] +axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] +poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -44,8 +45,9 @@ bytes = { version = "^1.0", optional = true } futures = { version = "^0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "^0.14", optional = true } -axum-lib = { version = "^0.2", optional = true, package="axum"} -http-body = { version = "^0.4", optional = true} +axum-lib = { version = "^0.2", optional = true, package = "axum" } +http-body = { version = "^0.4", optional = true } +poem-lib = { version = "1.0.21", optional = true, package = "poem" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 6cd36a3..6466a36 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -9,9 +9,12 @@ pub mod axum; feature = "actix", feature = "warp", feature = "reqwest", - feature = "axum" + feature = "axum", + feature = "poem" ))] pub mod http; +#[cfg(feature = "poem")] +pub mod poem; #[cfg(feature = "rdkafka")] pub mod rdkafka; #[cfg(feature = "reqwest")] diff --git a/src/binding/poem/extractor.rs b/src/binding/poem/extractor.rs new file mode 100644 index 0000000..cd2118b --- /dev/null +++ b/src/binding/poem/extractor.rs @@ -0,0 +1,112 @@ +use async_trait::async_trait; +use poem_lib::error::ReadBodyError; +use poem_lib::http::StatusCode; +use poem_lib::{FromRequest, IntoResponse, Request, RequestBody, Response}; + +use crate::binding::http::to_event; +use crate::Event; + +#[derive(Debug)] +pub enum ParseEventError { + ReadBody(ReadBodyError), + ParseEvent(crate::message::Error), +} + +impl From for ParseEventError { + fn from(err: ReadBodyError) -> Self { + ParseEventError::ReadBody(err) + } +} + +impl From for ParseEventError { + fn from(err: crate::message::Error) -> Self { + ParseEventError::ParseEvent(err) + } +} + +impl IntoResponse for ParseEventError { + fn into_response(self) -> Response { + match self { + ParseEventError::ReadBody(err) => err.into_response(), + ParseEventError::ParseEvent(err) => (StatusCode::BAD_REQUEST, err.to_string()).into(), + } + } +} + +#[async_trait] +impl<'a> FromRequest<'a> for Event { + type Error = ParseEventError; + + async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result { + Ok(to_event(req.headers(), body.take()?.into_vec().await?)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::fixtures; + use poem_lib::http::Method; + + #[tokio::test] + async fn test_request() { + let expected = fixtures::v10::minimal_string_extension(); + + let req = Request::builder() + .method(Method::POST) + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "test_event.test_application") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .finish(); + let (req, mut body) = req.split(); + let result = Event::from_request(&req, &mut body).await.unwrap(); + + assert_eq!(expected, result); + } + + #[tokio::test] + async fn test_bad_request() { + let req = Request::builder() + .method(Method::POST) + .header("ce-specversion", "BAD SPECIFICATION") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", fixtures::time().to_rfc3339()) + .finish(); + + let (req, mut body) = req.split(); + let resp = Event::from_request(&req, &mut body).await.into_response(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_eq!( + resp.into_body().into_string().await.unwrap(), + "Invalid specversion BAD SPECIFICATION" + ); + } + + #[tokio::test] + async fn test_request_with_full_data() { + let expected = fixtures::v10::full_binary_json_data_string_extension(); + + let req = Request::builder() + .method(Method::POST) + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "test_event.test_application") + .header("ce-source", "http://localhost/") + .header("ce-subject", "cloudevents-sdk") + .header("content-type", "application/json") + .header("ce-string_ex", "val") + .header("ce-int_ex", "10") + .header("ce-bool_ex", "true") + .header("ce-time", &fixtures::time().to_rfc3339()) + .body(fixtures::json_data_binary()); + let (req, mut body) = req.split(); + let result = Event::from_request(&req, &mut body).await.unwrap(); + + assert_eq!(expected, result); + } +} diff --git a/src/binding/poem/mod.rs b/src/binding/poem/mod.rs new file mode 100644 index 0000000..c26d8c0 --- /dev/null +++ b/src/binding/poem/mod.rs @@ -0,0 +1,59 @@ +//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with +//! [Poem](https://docs.rs/poem/) to easily send and receive CloudEvents. +//! +//! To deserialize an HTTP request as CloudEvent +//! +//! To echo events: +//! +//! ```rust +//! use cloudevents::Event; +//! use poem_lib as poem; +//! use poem::{handler, Route, post}; +//! +//! #[handler] +//! async fn index(event: Event) -> Event { +//! println!("received cloudevent {}", &event); +//! event +//! } +//! +//! let app = Route::new().at("/", post(index)); +//! ``` +//! +//! To create event inside request handlers and send them as responses: +//! +//! ```rust +//! use cloudevents::{Event, EventBuilder, EventBuilderV10}; +//! use poem_lib as poem; +//! use poem::{handler, Route, post, Result}; +//! use poem::error::InternalServerError; +//! use serde_json::json; +//! +//! #[handler] +//! async fn index() -> Result { +//! let event = EventBuilderV10::new() +//! .id("1") +//! .source("url://example_response/") +//! .ty("example.ce") +//! .data( +//! mime::APPLICATION_JSON.to_string(), +//! json!({ +//! "name": "John Doe", +//! "age": 43, +//! "phones": [ +//! "+44 1234567", +//! "+44 2345678" +//! ] +//! }), +//! ) +//! .build() +//! .map_err(InternalServerError)?; +//! Ok(event) +//! } +//! +//! let app = Route::new().at("/", post(index)); +//! ``` + +mod extractor; +mod response; + +pub use extractor::ParseEventError; diff --git a/src/binding/poem/response.rs b/src/binding/poem/response.rs new file mode 100644 index 0000000..20560b2 --- /dev/null +++ b/src/binding/poem/response.rs @@ -0,0 +1,95 @@ +use poem_lib::http::StatusCode; +use poem_lib::{IntoResponse, Response}; + +use crate::binding::http::builder::adapter::to_response; +use crate::Event; + +impl IntoResponse for Event { + fn into_response(self) -> Response { + match to_response(self) { + Ok(resp) => resp.into(), + Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into(), + } + } +} + +#[cfg(test)] +mod tests { + use crate::test::fixtures; + use poem_lib::IntoResponse; + + #[test] + fn test_response() { + let input = fixtures::v10::minimal_string_extension(); + + let resp = input.into_response(); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "test_event.test_application" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + } + + #[tokio::test] + async fn test_response_with_full_data() { + let input = fixtures::v10::full_binary_json_data_string_extension(); + + let resp = input.into_response(); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "test_event.test_application" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-int_ex").unwrap().to_str().unwrap(), + "10" + ); + + let body = resp.into_body().into_vec().await.unwrap(); + assert_eq!(fixtures::json_data_binary(), body); + } +} diff --git a/src/lib.rs b/src/lib.rs index bde72ed..b90aebe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(in_band_lifetimes)] //! This crate implements the [CloudEvents](https://cloudevents.io/) Spec for Rust. //! //! ```