New feature: cloudevents-poem

Signed-off-by: Sunli <scott_s829@163.com>
This commit is contained in:
Sunli 2021-11-07 10:28:11 +08:00
parent 1e89203cbc
commit d987002173
6 changed files with 276 additions and 4 deletions

View File

@ -22,7 +22,8 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
rdkafka = ["rdkafka-lib", "bytes", "futures"]
warp = ["warp-lib", "bytes", "http", "hyper"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
@ -44,8 +45,9 @@ bytes = { version = "^1.0", optional = true }
futures = { version = "^0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "^0.14", optional = true }
axum-lib = { version = "^0.2", optional = true, package="axum"}
http-body = { version = "^0.4", optional = true}
axum-lib = { version = "^0.2", optional = true, package = "axum" }
http-body = { version = "^0.4", optional = true }
poem-lib = { version = "1.0.21", optional = true, package = "poem" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"

View File

@ -9,9 +9,12 @@ pub mod axum;
feature = "actix",
feature = "warp",
feature = "reqwest",
feature = "axum"
feature = "axum",
feature = "poem"
))]
pub mod http;
#[cfg(feature = "poem")]
pub mod poem;
#[cfg(feature = "rdkafka")]
pub mod rdkafka;
#[cfg(feature = "reqwest")]

View File

@ -0,0 +1,112 @@
use async_trait::async_trait;
use poem_lib::error::ReadBodyError;
use poem_lib::http::StatusCode;
use poem_lib::{FromRequest, IntoResponse, Request, RequestBody, Response};
use crate::binding::http::to_event;
use crate::Event;
#[derive(Debug)]
pub enum ParseEventError {
ReadBody(ReadBodyError),
ParseEvent(crate::message::Error),
}
impl From<ReadBodyError> for ParseEventError {
fn from(err: ReadBodyError) -> Self {
ParseEventError::ReadBody(err)
}
}
impl From<crate::message::Error> for ParseEventError {
fn from(err: crate::message::Error) -> Self {
ParseEventError::ParseEvent(err)
}
}
impl IntoResponse for ParseEventError {
fn into_response(self) -> Response {
match self {
ParseEventError::ReadBody(err) => err.into_response(),
ParseEventError::ParseEvent(err) => (StatusCode::BAD_REQUEST, err.to_string()).into(),
}
}
}
#[async_trait]
impl<'a> FromRequest<'a> for Event {
type Error = ParseEventError;
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self, Self::Error> {
Ok(to_event(req.headers(), body.take()?.into_vec().await?)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::fixtures;
use poem_lib::http::Method;
#[tokio::test]
async fn test_request() {
let expected = fixtures::v10::minimal_string_extension();
let req = Request::builder()
.method(Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "test_event.test_application")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.finish();
let (req, mut body) = req.split();
let result = Event::from_request(&req, &mut body).await.unwrap();
assert_eq!(expected, result);
}
#[tokio::test]
async fn test_bad_request() {
let req = Request::builder()
.method(Method::POST)
.header("ce-specversion", "BAD SPECIFICATION")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", fixtures::time().to_rfc3339())
.finish();
let (req, mut body) = req.split();
let resp = Event::from_request(&req, &mut body).await.into_response();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(
resp.into_body().into_string().await.unwrap(),
"Invalid specversion BAD SPECIFICATION"
);
}
#[tokio::test]
async fn test_request_with_full_data() {
let expected = fixtures::v10::full_binary_json_data_string_extension();
let req = Request::builder()
.method(Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "test_event.test_application")
.header("ce-source", "http://localhost/")
.header("ce-subject", "cloudevents-sdk")
.header("content-type", "application/json")
.header("ce-string_ex", "val")
.header("ce-int_ex", "10")
.header("ce-bool_ex", "true")
.header("ce-time", &fixtures::time().to_rfc3339())
.body(fixtures::json_data_binary());
let (req, mut body) = req.split();
let result = Event::from_request(&req, &mut body).await.unwrap();
assert_eq!(expected, result);
}
}

59
src/binding/poem/mod.rs Normal file
View File

@ -0,0 +1,59 @@
//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with
//! [Poem](https://docs.rs/poem/) to easily send and receive CloudEvents.
//!
//! To deserialize an HTTP request as CloudEvent
//!
//! To echo events:
//!
//! ```rust
//! use cloudevents::Event;
//! use poem_lib as poem;
//! use poem::{handler, Route, post};
//!
//! #[handler]
//! async fn index(event: Event) -> Event {
//! println!("received cloudevent {}", &event);
//! event
//! }
//!
//! let app = Route::new().at("/", post(index));
//! ```
//!
//! To create event inside request handlers and send them as responses:
//!
//! ```rust
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
//! use poem_lib as poem;
//! use poem::{handler, Route, post, Result};
//! use poem::error::InternalServerError;
//! use serde_json::json;
//!
//! #[handler]
//! async fn index() -> Result<Event> {
//! let event = EventBuilderV10::new()
//! .id("1")
//! .source("url://example_response/")
//! .ty("example.ce")
//! .data(
//! mime::APPLICATION_JSON.to_string(),
//! json!({
//! "name": "John Doe",
//! "age": 43,
//! "phones": [
//! "+44 1234567",
//! "+44 2345678"
//! ]
//! }),
//! )
//! .build()
//! .map_err(InternalServerError)?;
//! Ok(event)
//! }
//!
//! let app = Route::new().at("/", post(index));
//! ```
mod extractor;
mod response;
pub use extractor::ParseEventError;

View File

@ -0,0 +1,95 @@
use poem_lib::http::StatusCode;
use poem_lib::{IntoResponse, Response};
use crate::binding::http::builder::adapter::to_response;
use crate::Event;
impl IntoResponse for Event {
fn into_response(self) -> Response {
match to_response(self) {
Ok(resp) => resp.into(),
Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into(),
}
}
}
#[cfg(test)]
mod tests {
use crate::test::fixtures;
use poem_lib::IntoResponse;
#[test]
fn test_response() {
let input = fixtures::v10::minimal_string_extension();
let resp = input.into_response();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"test_event.test_application"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
}
#[tokio::test]
async fn test_response_with_full_data() {
let input = fixtures::v10::full_binary_json_data_string_extension();
let resp = input.into_response();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"test_event.test_application"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-int_ex").unwrap().to_str().unwrap(),
"10"
);
let body = resp.into_body().into_vec().await.unwrap();
assert_eq!(fixtures::json_data_binary(), body);
}
}

View File

@ -1,3 +1,4 @@
#![feature(in_band_lifetimes)]
//! This crate implements the [CloudEvents](https://cloudevents.io/) Spec for Rust.
//!
//! ```