Add Axum binding

Signed-off-by: andrew webber (personal) <andrewvwebber@googlemail.com>
This commit is contained in:
andrew webber (personal) 2021-08-09 22:29:09 +02:00 committed by Jim Crossley
parent b540542040
commit 6074b4db7e
14 changed files with 635 additions and 6 deletions

View File

@ -129,3 +129,11 @@ jobs:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml
- uses: actions-rs/cargo@v1
name: "Build axum-example"
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/axum-example/Cargo.toml

View File

@ -21,6 +21,7 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
rdkafka = ["rdkafka-lib", "bytes", "futures"]
warp = ["warp-lib", "bytes", "http", "hyper"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
@ -42,6 +43,8 @@ bytes = { version = "^1.0", optional = true }
futures = { version = "^0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "^0.14", optional = true }
axum-lib = { version = "^0.2", optional = true , package="axum"}
http-body = { version = "^0.4", optional = true}
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"
@ -65,3 +68,4 @@ chrono = { version = "^0.4", features = ["serde"] }
mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
mime = "0.3"
tower = { version = "0.4", features = ["util"] }

View File

@ -0,0 +1,21 @@
[package]
name = "axum-example"
version = "0.3.0"
authors = ["Andrew Webber <andrewvwebber@googlemail.com>"]
edition = "2018"
[dependencies]
cloudevents-sdk = { path = "../..", features = ["axum"] }
axum = "^0.2"
http = "^0.2"
tokio = { version = "^1", features = ["full"] }
tracing = "^0.1"
tracing-subscriber = "^0.2"
tower-http = { version = "^0.1", features = ["trace"] }
[dev-dependencies]
tower = { version = "^0.4", features = ["util"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
hyper = { version = "^0.14" }

View File

@ -0,0 +1,115 @@
use axum::{
handler::{get, post},
routing::BoxRoute,
Router,
};
use cloudevents::Event;
use http::StatusCode;
use std::net::SocketAddr;
use tower_http::trace::TraceLayer;
fn echo_app() -> Router<BoxRoute> {
Router::new()
.route("/", get(|| async { "hello from cloudevents server" }))
.route(
"/",
post(|event: Event| async move {
tracing::debug!("received cloudevent {}", &event);
(StatusCode::OK, event)
}),
)
.layer(TraceLayer::new_for_http())
.boxed()
}
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "axum_example=debug,tower_http=debug")
}
tracing_subscriber::fmt::init();
let service = echo_app();
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(service.into_make_service())
.await
.unwrap();
}
#[cfg(test)]
mod tests {
use super::echo_app;
use axum::{
body::Body,
http::{self, Request},
};
use chrono::Utc;
use hyper;
use serde_json::json;
use tower::ServiceExt; // for `app.oneshot()`
#[tokio::test]
async fn axum_mod_test() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "axum_example=debug,tower_http=debug")
}
tracing_subscriber::fmt::init();
let app = echo_app();
let time = Utc::now();
let j = json!({"hello": "world"});
let request = Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&j).unwrap()))
.unwrap();
let resp = app.oneshot(request).await.unwrap();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let (_, body) = resp.into_parts();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(j.to_string().as_bytes(), body);
}
}

143
src/binding/axum/extract.rs Normal file
View File

@ -0,0 +1,143 @@
use axum_lib as axum;
use async_trait::async_trait;
use axum::extract::{FromRequest, RequestParts};
use http::StatusCode;
use http_body::Body;
use hyper::body;
use crate::binding::http::to_event;
use crate::event::Event;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[async_trait]
impl<B> FromRequest<B> for Event
where
B: Body + Send,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Rejection = (StatusCode, String);
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
let headers = req.headers().cloned().ok_or(0).map_err(|_| {
(
StatusCode::BAD_REQUEST,
"unexpected empty headers".to_string(),
)
})?;
let req_body = req
.take_body()
.ok_or(0)
.map_err(|_| (StatusCode::BAD_REQUEST, "unexpected empty body".to_string()))?;
let buf = body::to_bytes(req_body)
.await
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e.into())))?;
to_event(&headers, buf.to_vec()).map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e)))
}
}
#[cfg(test)]
mod tests {
use axum_lib as axum;
use super::*;
use axum::body::Body;
use axum::http::{self, Request, StatusCode};
use chrono::Utc;
use serde_json::json;
use crate::{EventBuilder, EventBuilderV10};
#[tokio::test]
async fn axum_test_request() {
let time = Utc::now();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.time(time)
.extension("someint", "10")
.build()
.unwrap();
let mut request = RequestParts::new(
Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.body(Body::empty())
.unwrap(),
);
let result = Event::from_request(&mut request).await.unwrap();
assert_eq!(expected, result);
}
#[tokio::test]
async fn axum_test_bad_request() {
let time = Utc::now();
let mut request = RequestParts::new(
Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "BAD SPECIFICATION")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.body(Body::empty())
.unwrap(),
);
let result = Event::from_request(&mut request).await;
assert!(result.is_err());
let rejection = result.unwrap_err();
let reason = rejection.0;
assert_eq!(reason, StatusCode::BAD_REQUEST)
}
#[tokio::test]
async fn axum_test_request_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.time(time)
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let mut request = RequestParts::new(
Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&j).unwrap()))
.unwrap(),
);
let result = Event::from_request(&mut request).await.unwrap();
assert_eq!(expected, result);
}
}

169
src/binding/axum/mod.rs Normal file
View File

@ -0,0 +1,169 @@
//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Axum web service framework](https://docs.rs/axum/)
//! to easily send and receive CloudEvents.
//!
//! To deserialize an HTTP request as CloudEvent
//!
//! To echo events:
//!
//! ```
//! use axum_lib as axum;
//! use axum::{
//! handler::{get, post},
//! routing::BoxRoute,
//! Router,
//! };
//! use cloudevents::Event;
//! use http::StatusCode;
//!
//! fn app() -> Router<BoxRoute> {
//! Router::new()
//! .route("/", get(|| async { "hello from cloudevents server" }))
//! .route(
//! "/",
//! post(|event: Event| async move {
//! println!("received cloudevent {}", &event);
//! (StatusCode::OK, event)
//! }),
//! )
//! .boxed()
//! }
//!
//! ```
//!
//! To create event inside request handlers and send them as responses:
//!
//! ```
//! use axum_lib as axum;
//! use axum::{
//! handler::{get, post},
//! routing::BoxRoute,
//! Router,
//! };
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
//! use http::StatusCode;
//! use serde_json::json;
//!
//! fn app() -> Router<BoxRoute> {
//! Router::new()
//! .route("/", get(|| async { "hello from cloudevents server" }))
//! .route(
//! "/",
//! post(|| async move {
//! let event = EventBuilderV10::new()
//! .id("1")
//! .source("url://example_response/")
//! .ty("example.ce")
//! .data(
//! mime::APPLICATION_JSON.to_string(),
//! json!({
//! "name": "John Doe",
//! "age": 43,
//! "phones": [
//! "+44 1234567",
//! "+44 2345678"
//! ]
//! }),
//! )
//! .build()
//! .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
//!
//! Ok::<Event, (StatusCode, String)>(event)
//! }),
//! )
//! .boxed()
//! }
//!
//! ```
pub mod extract;
pub mod response;
mod server_response;
#[cfg(test)]
mod tests {
use axum_lib as axum;
use axum::{
body::Body,
handler::{get, post},
http::{self, Request, StatusCode},
routing::BoxRoute,
Router,
};
use chrono::Utc;
use serde_json::json;
use tower::ServiceExt; // for `app.oneshot()`
use crate::Event;
fn echo_app() -> Router<BoxRoute> {
Router::new()
.route("/", get(|| async { "hello from cloudevents server" }))
.route(
"/",
post(|event: Event| async move {
println!("received cloudevent {}", &event);
(StatusCode::OK, event)
}),
)
.boxed()
}
#[tokio::test]
async fn axum_mod_test() {
let app = echo_app();
let time = Utc::now();
let j = json!({"hello": "world"});
let request = Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&j).unwrap()))
.unwrap();
let resp = app.oneshot(request).await.unwrap();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let (_, body) = resp.into_parts();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(j.to_string().as_bytes(), body);
}
}

View File

@ -0,0 +1,123 @@
use axum_lib as axum;
use axum::{body::Body, http::Response, response::IntoResponse};
use http::{header, StatusCode};
use super::server_response::event_to_response;
use crate::event::Event;
impl IntoResponse for Event {
type Body = Body;
type BodyError = <Self::Body as axum::body::HttpBody>::Error;
fn into_response(self) -> Response<Body> {
match event_to_response(self) {
Ok(resp) => resp,
Err(err) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(header::CONTENT_TYPE, "text/plain")
.body(err.to_string().into())
.unwrap(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use crate::{EventBuilder, EventBuilderV10};
#[test]
fn axum_test_response() {
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let resp = input.into_response();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
}
#[tokio::test]
async fn axum_test_response_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let resp = input.into_response();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let (_, body) = resp.into_parts();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(j.to_string().as_bytes(), body);
}
}

View File

@ -0,0 +1,38 @@
use axum_lib as axum;
use axum::{body::Body, http::Response};
use std::cell::Cell;
use crate::binding::http::{Builder, Serializer};
use crate::message::{BinaryDeserializer, Error, Result};
use crate::Event;
struct Adapter {
builder: Cell<http::response::Builder>,
}
impl Builder<Response<Body>> for Adapter {
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
self.builder.set(self.builder.take().header(key, value));
}
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
self.builder
.take()
.body(Body::from(bytes))
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
}
fn finish(&mut self) -> Result<Response<Body>> {
self.body(Vec::new())
}
}
pub fn event_to_response(event: Event) -> std::result::Result<Response<Body>, Error> {
BinaryDeserializer::deserialize_binary(
event,
Serializer::new(Adapter {
builder: Cell::new(http::Response::builder()),
}),
)
}

View File

@ -2,7 +2,14 @@
#[cfg(feature = "actix")]
pub mod actix;
#[cfg(any(feature = "actix", feature = "warp", feature = "reqwest"))]
#[cfg(feature = "axum")]
pub mod axum;
#[cfg(any(
feature = "actix",
feature = "warp",
feature = "reqwest",
feature = "axum"
))]
pub mod http;
#[cfg(feature = "rdkafka")]
pub mod rdkafka;

View File

@ -242,7 +242,7 @@ mod tests {
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.data("application/json", j)
.extension("someint", "10")
.build()
.unwrap();

View File

@ -35,8 +35,8 @@ impl fmt::Display for AttributeValue<'_> {
match self {
AttributeValue::SpecVersion(s) => s.fmt(f),
AttributeValue::String(s) => f.write_str(s),
AttributeValue::URI(s) => f.write_str(&s.as_str()),
AttributeValue::URIRef(s) => f.write_str(&s.as_str()),
AttributeValue::URI(s) => f.write_str(s.as_str()),
AttributeValue::URIRef(s) => f.write_str(s.as_str()),
AttributeValue::Time(s) => f.write_str(&s.to_rfc3339()),
AttributeValue::Boolean(b) => f.serialize_bool(**b),
AttributeValue::Integer(i) => f.serialize_i64(**i),

View File

@ -235,7 +235,7 @@ mod tests {
let mut event = EventBuilderV03::new()
.id(id)
.source(source.clone())
.source(source.to_string())
.ty(ty)
.subject(subject)
.time(time)

View File

@ -235,7 +235,7 @@ mod tests {
let mut event = EventBuilderV10::new()
.id(id)
.source(source.clone())
.source(source.to_string())
.ty(ty)
.subject(subject)
.time(time)

View File

@ -46,6 +46,7 @@
//! [Extractors] and [Responders]
//! - `reqwest`: Enables the [`binding::reqwest`] protocol binding module.
//! - `warp`: Enables the [`binding::warp`] protocol binding module.
//! - `axum`: Enables the [`binding::axum`] protocol binding module.
//! - `rdkafka`: Enables the [`binding::rdkafka`] protocol binding module to
//! seamlessly consume/produce cloudevents within Kafka messages.
//!