diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index 3c7db15..c071688 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -129,3 +129,11 @@ jobs: command: build toolchain: ${{ matrix.toolchain }} args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml + + - uses: actions-rs/cargo@v1 + name: "Build axum-example" + if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable' + with: + command: build + toolchain: ${{ matrix.toolchain }} + args: --target ${{ matrix.target }} --manifest-path ./example-projects/axum-example/Cargo.toml diff --git a/Cargo.toml b/Cargo.toml index a5f0e9a..03e2f23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"] reqwest = ["reqwest-lib", "async-trait", "bytes", "http"] rdkafka = ["rdkafka-lib", "bytes", "futures"] warp = ["warp-lib", "bytes", "http", "hyper"] +axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -42,6 +43,8 @@ bytes = { version = "^1.0", optional = true } futures = { version = "^0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "^0.14", optional = true } +axum-lib = { version = "^0.2", optional = true , package="axum"} +http-body = { version = "^0.4", optional = true} [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -65,3 +68,4 @@ chrono = { version = "^0.4", features = ["serde"] } mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } mime = "0.3" +tower = { version = "0.4", features = ["util"] } diff --git a/example-projects/axum-example/Cargo.toml b/example-projects/axum-example/Cargo.toml new file mode 100644 index 0000000..cb6cc09 --- /dev/null +++ b/example-projects/axum-example/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "axum-example" +version = "0.3.0" +authors = ["Andrew Webber "] +edition = "2018" + +[dependencies] +cloudevents-sdk = { path = "../..", features = ["axum"] } +axum = "^0.2" +http = "^0.2" +tokio = { version = "^1", features = ["full"] } +tracing = "^0.1" +tracing-subscriber = "^0.2" +tower-http = { version = "^0.1", features = ["trace"] } + +[dev-dependencies] +tower = { version = "^0.4", features = ["util"] } +serde = { version = "^1.0", features = ["derive"] } +serde_json = "^1.0" +chrono = { version = "^0.4", features = ["serde"] } +hyper = { version = "^0.14" } diff --git a/example-projects/axum-example/src/main.rs b/example-projects/axum-example/src/main.rs new file mode 100644 index 0000000..8caaec2 --- /dev/null +++ b/example-projects/axum-example/src/main.rs @@ -0,0 +1,115 @@ +use axum::{ + handler::{get, post}, + routing::BoxRoute, + Router, +}; +use cloudevents::Event; +use http::StatusCode; +use std::net::SocketAddr; +use tower_http::trace::TraceLayer; + +fn echo_app() -> Router { + Router::new() + .route("/", get(|| async { "hello from cloudevents server" })) + .route( + "/", + post(|event: Event| async move { + tracing::debug!("received cloudevent {}", &event); + (StatusCode::OK, event) + }), + ) + .layer(TraceLayer::new_for_http()) + .boxed() +} + +#[tokio::main] +async fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "axum_example=debug,tower_http=debug") + } + tracing_subscriber::fmt::init(); + let service = echo_app(); + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + tracing::debug!("listening on {}", addr); + axum::Server::bind(&addr) + .serve(service.into_make_service()) + .await + .unwrap(); +} + +#[cfg(test)] +mod tests { + + use super::echo_app; + + use axum::{ + body::Body, + http::{self, Request}, + }; + use chrono::Utc; + use hyper; + use serde_json::json; + use tower::ServiceExt; // for `app.oneshot()` + + #[tokio::test] + async fn axum_mod_test() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "axum_example=debug,tower_http=debug") + } + tracing_subscriber::fmt::init(); + + let app = echo_app(); + let time = Utc::now(); + let j = json!({"hello": "world"}); + let request = Request::builder() + .method(http::Method::POST) + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&j).unwrap())) + .unwrap(); + + let resp = app.oneshot(request).await.unwrap(); + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + + let (_, body) = resp.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + + assert_eq!(j.to_string().as_bytes(), body); + } +} diff --git a/src/binding/axum/extract.rs b/src/binding/axum/extract.rs new file mode 100644 index 0000000..0acfa31 --- /dev/null +++ b/src/binding/axum/extract.rs @@ -0,0 +1,143 @@ +use axum_lib as axum; + +use async_trait::async_trait; +use axum::extract::{FromRequest, RequestParts}; +use http::StatusCode; +use http_body::Body; +use hyper::body; + +use crate::binding::http::to_event; +use crate::event::Event; + +type BoxError = Box; + +#[async_trait] +impl FromRequest for Event +where + B: Body + Send, + B::Data: Send, + B::Error: Into, +{ + type Rejection = (StatusCode, String); + + async fn from_request(req: &mut RequestParts) -> Result { + let headers = req.headers().cloned().ok_or(0).map_err(|_| { + ( + StatusCode::BAD_REQUEST, + "unexpected empty headers".to_string(), + ) + })?; + + let req_body = req + .take_body() + .ok_or(0) + .map_err(|_| (StatusCode::BAD_REQUEST, "unexpected empty body".to_string()))?; + + let buf = body::to_bytes(req_body) + .await + .map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e.into())))?; + to_event(&headers, buf.to_vec()).map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e))) + } +} + +#[cfg(test)] +mod tests { + use axum_lib as axum; + + use super::*; + use axum::body::Body; + use axum::http::{self, Request, StatusCode}; + use chrono::Utc; + use serde_json::json; + + use crate::{EventBuilder, EventBuilderV10}; + + #[tokio::test] + async fn axum_test_request() { + let time = Utc::now(); + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .time(time) + .extension("someint", "10") + .build() + .unwrap(); + + let mut request = RequestParts::new( + Request::builder() + .method(http::Method::POST) + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .body(Body::empty()) + .unwrap(), + ); + + let result = Event::from_request(&mut request).await.unwrap(); + + assert_eq!(expected, result); + } + + #[tokio::test] + async fn axum_test_bad_request() { + let time = Utc::now(); + + let mut request = RequestParts::new( + Request::builder() + .method(http::Method::POST) + .header("ce-specversion", "BAD SPECIFICATION") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .body(Body::empty()) + .unwrap(), + ); + + let result = Event::from_request(&mut request).await; + assert!(result.is_err()); + let rejection = result.unwrap_err(); + + let reason = rejection.0; + assert_eq!(reason, StatusCode::BAD_REQUEST) + } + + #[tokio::test] + async fn axum_test_request_with_full_data() { + let time = Utc::now(); + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .time(time) + .data("application/json", j.to_string().into_bytes()) + .extension("someint", "10") + .build() + .unwrap(); + + let mut request = RequestParts::new( + Request::builder() + .method(http::Method::POST) + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&j).unwrap())) + .unwrap(), + ); + + let result = Event::from_request(&mut request).await.unwrap(); + + assert_eq!(expected, result); + } +} diff --git a/src/binding/axum/mod.rs b/src/binding/axum/mod.rs new file mode 100644 index 0000000..3904a8b --- /dev/null +++ b/src/binding/axum/mod.rs @@ -0,0 +1,169 @@ +//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Axum web service framework](https://docs.rs/axum/) +//! to easily send and receive CloudEvents. +//! +//! To deserialize an HTTP request as CloudEvent +//! +//! To echo events: +//! +//! ``` +//! use axum_lib as axum; +//! use axum::{ +//! handler::{get, post}, +//! routing::BoxRoute, +//! Router, +//! }; +//! use cloudevents::Event; +//! use http::StatusCode; +//! +//! fn app() -> Router { +//! Router::new() +//! .route("/", get(|| async { "hello from cloudevents server" })) +//! .route( +//! "/", +//! post(|event: Event| async move { +//! println!("received cloudevent {}", &event); +//! (StatusCode::OK, event) +//! }), +//! ) +//! .boxed() +//! } +//! +//! ``` +//! +//! To create event inside request handlers and send them as responses: +//! +//! ``` +//! use axum_lib as axum; +//! use axum::{ +//! handler::{get, post}, +//! routing::BoxRoute, +//! Router, +//! }; +//! use cloudevents::{Event, EventBuilder, EventBuilderV10}; +//! use http::StatusCode; +//! use serde_json::json; +//! +//! fn app() -> Router { +//! Router::new() +//! .route("/", get(|| async { "hello from cloudevents server" })) +//! .route( +//! "/", +//! post(|| async move { +//! let event = EventBuilderV10::new() +//! .id("1") +//! .source("url://example_response/") +//! .ty("example.ce") +//! .data( +//! mime::APPLICATION_JSON.to_string(), +//! json!({ +//! "name": "John Doe", +//! "age": 43, +//! "phones": [ +//! "+44 1234567", +//! "+44 2345678" +//! ] +//! }), +//! ) +//! .build() +//! .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; +//! +//! Ok::(event) +//! }), +//! ) +//! .boxed() +//! } +//! +//! ``` + +pub mod extract; +pub mod response; +mod server_response; + +#[cfg(test)] +mod tests { + + use axum_lib as axum; + + use axum::{ + body::Body, + handler::{get, post}, + http::{self, Request, StatusCode}, + routing::BoxRoute, + Router, + }; + use chrono::Utc; + use serde_json::json; + use tower::ServiceExt; // for `app.oneshot()` + + use crate::Event; + + fn echo_app() -> Router { + Router::new() + .route("/", get(|| async { "hello from cloudevents server" })) + .route( + "/", + post(|event: Event| async move { + println!("received cloudevent {}", &event); + (StatusCode::OK, event) + }), + ) + .boxed() + } + + #[tokio::test] + async fn axum_mod_test() { + let app = echo_app(); + let time = Utc::now(); + let j = json!({"hello": "world"}); + let request = Request::builder() + .method(http::Method::POST) + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&j).unwrap())) + .unwrap(); + + let resp = app.oneshot(request).await.unwrap(); + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + + let (_, body) = resp.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + + assert_eq!(j.to_string().as_bytes(), body); + } +} diff --git a/src/binding/axum/response.rs b/src/binding/axum/response.rs new file mode 100644 index 0000000..d6694d1 --- /dev/null +++ b/src/binding/axum/response.rs @@ -0,0 +1,123 @@ +use axum_lib as axum; + +use axum::{body::Body, http::Response, response::IntoResponse}; +use http::{header, StatusCode}; + +use super::server_response::event_to_response; +use crate::event::Event; + +impl IntoResponse for Event { + type Body = Body; + type BodyError = ::Error; + + fn into_response(self) -> Response { + match event_to_response(self) { + Ok(resp) => resp, + Err(err) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(header::CONTENT_TYPE, "text/plain") + .body(err.to_string().into()) + .unwrap(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + use crate::{EventBuilder, EventBuilderV10}; + + #[test] + fn axum_test_response() { + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .extension("someint", "10") + .build() + .unwrap(); + + let resp = input.into_response(); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + } + + #[tokio::test] + async fn axum_test_response_with_full_data() { + let j = json!({"hello": "world"}); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let resp = input.into_response(); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + + let (_, body) = resp.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + + assert_eq!(j.to_string().as_bytes(), body); + } +} diff --git a/src/binding/axum/server_response.rs b/src/binding/axum/server_response.rs new file mode 100644 index 0000000..0d76ce7 --- /dev/null +++ b/src/binding/axum/server_response.rs @@ -0,0 +1,38 @@ +use axum_lib as axum; + +use axum::{body::Body, http::Response}; +use std::cell::Cell; + +use crate::binding::http::{Builder, Serializer}; +use crate::message::{BinaryDeserializer, Error, Result}; +use crate::Event; + +struct Adapter { + builder: Cell, +} + +impl Builder> for Adapter { + fn header(&mut self, key: &str, value: http::header::HeaderValue) { + self.builder.set(self.builder.take().header(key, value)); + } + fn body(&mut self, bytes: Vec) -> Result> { + self.builder + .take() + .body(Body::from(bytes)) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + } + fn finish(&mut self) -> Result> { + self.body(Vec::new()) + } +} + +pub fn event_to_response(event: Event) -> std::result::Result, Error> { + BinaryDeserializer::deserialize_binary( + event, + Serializer::new(Adapter { + builder: Cell::new(http::Response::builder()), + }), + ) +} diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 2f1a296..245cb1d 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -2,7 +2,14 @@ #[cfg(feature = "actix")] pub mod actix; -#[cfg(any(feature = "actix", feature = "warp", feature = "reqwest"))] +#[cfg(feature = "axum")] +pub mod axum; +#[cfg(any( + feature = "actix", + feature = "warp", + feature = "reqwest", + feature = "axum" +))] pub mod http; #[cfg(feature = "rdkafka")] pub mod rdkafka; diff --git a/src/binding/rdkafka/kafka_consumer_record.rs b/src/binding/rdkafka/kafka_consumer_record.rs index 15310a3..63ba042 100644 --- a/src/binding/rdkafka/kafka_consumer_record.rs +++ b/src/binding/rdkafka/kafka_consumer_record.rs @@ -242,7 +242,7 @@ mod tests { .id("0001") .ty("example.test") .source("http://localhost") - .data("application/json", j.clone()) + .data("application/json", j) .extension("someint", "10") .build() .unwrap(); diff --git a/src/event/attributes.rs b/src/event/attributes.rs index 30c1d30..12cce2a 100644 --- a/src/event/attributes.rs +++ b/src/event/attributes.rs @@ -35,8 +35,8 @@ impl fmt::Display for AttributeValue<'_> { match self { AttributeValue::SpecVersion(s) => s.fmt(f), AttributeValue::String(s) => f.write_str(s), - AttributeValue::URI(s) => f.write_str(&s.as_str()), - AttributeValue::URIRef(s) => f.write_str(&s.as_str()), + AttributeValue::URI(s) => f.write_str(s.as_str()), + AttributeValue::URIRef(s) => f.write_str(s.as_str()), AttributeValue::Time(s) => f.write_str(&s.to_rfc3339()), AttributeValue::Boolean(b) => f.serialize_bool(**b), AttributeValue::Integer(i) => f.serialize_i64(**i), diff --git a/src/event/v03/builder.rs b/src/event/v03/builder.rs index 98403ad..6f18403 100644 --- a/src/event/v03/builder.rs +++ b/src/event/v03/builder.rs @@ -235,7 +235,7 @@ mod tests { let mut event = EventBuilderV03::new() .id(id) - .source(source.clone()) + .source(source.to_string()) .ty(ty) .subject(subject) .time(time) diff --git a/src/event/v10/builder.rs b/src/event/v10/builder.rs index 0acf3bf..f37de74 100644 --- a/src/event/v10/builder.rs +++ b/src/event/v10/builder.rs @@ -235,7 +235,7 @@ mod tests { let mut event = EventBuilderV10::new() .id(id) - .source(source.clone()) + .source(source.to_string()) .ty(ty) .subject(subject) .time(time) diff --git a/src/lib.rs b/src/lib.rs index 9a015cd..652f18f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,7 @@ //! [Extractors] and [Responders] //! - `reqwest`: Enables the [`binding::reqwest`] protocol binding module. //! - `warp`: Enables the [`binding::warp`] protocol binding module. +//! - `axum`: Enables the [`binding::axum`] protocol binding module. //! - `rdkafka`: Enables the [`binding::rdkafka`] protocol binding module to //! seamlessly consume/produce cloudevents within Kafka messages. //!