diff --git a/Cargo.toml b/Cargo.toml index cc16bd4..f7011bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ name = "cloudevents" [features] http-binding = ["async-trait", "bytes", "futures", "http"] -actix = ["actix-web", "async-trait", "bytes", "futures", "http"] +actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http"] reqwest = ["reqwest-lib", "async-trait", "bytes", "http"] rdkafka = ["rdkafka-lib", "bytes", "futures"] warp = ["warp-lib", "bytes", "http", "hyper"] @@ -36,9 +36,8 @@ snafu = "^0.6" bitflags = "^1.2" # runtime optional deps -actix-web = { version = "=4.0.0-beta.8", default-features = false, optional = true } -actix-http = { version = "=3.0.0-beta.8", default-features = false, optional = true } -actix-tls = { version = "=3.0.0-beta.5", default-features = false, optional = true } +actix-web = { version = "=4.0.0-beta.19", default-features = false, optional = true } +actix-http = { version = "=3.0.0-beta.18", default-features = false, optional = true } reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" } rdkafka-lib = { version = "^0.28", features = ["cmake-build"], optional = true, package = "rdkafka" } warp-lib = { version = "^0.3", optional = true, package = "warp" } diff --git a/example-projects/actix-web-example/Cargo.toml b/example-projects/actix-web-example/Cargo.toml index 5521dee..533a57f 100644 --- a/example-projects/actix-web-example/Cargo.toml +++ b/example-projects/actix-web-example/Cargo.toml @@ -6,8 +6,8 @@ edition = "2018" [dependencies] cloudevents-sdk = { path = "../..", features = ["actix"] } -actix-web = "4.0.0-beta.8" -actix-cors = "0.6.0-beta.2" +actix-web = "4.0.0-beta.19" +actix-cors = "0.6.0-beta.8" serde_json = "^1.0" url = { version = "^2.1" } env_logger = "0.7.1" diff --git a/src/binding/actix/mod.rs b/src/binding/actix/mod.rs index a9cb666..25ad7c6 100644 --- a/src/binding/actix/mod.rs +++ b/src/binding/actix/mod.rs @@ -3,6 +3,19 @@ //! To deserialize an HTTP request as CloudEvent: //! //! ``` +//! use cloudevents::Event; +//! use actix_web::post; +//! +//! #[post("/")] +//! async fn post_event(event: Event) -> Result { +//! println!("Received Event: {:?}", event); +//! Ok(format!("{:?}", event)) +//! } +//! ``` +//! +//! For more complex applications, access the Payload directly: +//! +//! ``` //! use cloudevents::binding::actix::HttpRequestExt; //! use actix_web::{HttpRequest, web, post}; //! @@ -17,8 +30,30 @@ //! To serialize a CloudEvent to an HTTP response: //! //! ``` +//! use actix_web::get; +//! use cloudevents::{Event, EventBuilderV10, EventBuilder}; +//! use serde_json::json; +//! +//! #[get("/")] +//! async fn get_event() -> Event { +//! let payload = json!({"hello": "world"}); +//! +//! EventBuilderV10::new() +//! .id("0001") +//! .ty("example.test") +//! .source("http://localhost/") +//! .data("application/json", payload) +//! .extension("someint", "10") +//! .build() +//! .unwrap() +//! } +//! ``` +//! +//! For more complex applications, use the HTTP response builder extension: +//! +//! ``` //! use cloudevents::binding::actix::HttpResponseBuilderExt; -//! use actix_web::{HttpRequest, web, get, HttpResponse}; +//! use actix_web::{get, HttpResponse}; //! use cloudevents::{EventBuilderV10, EventBuilder}; //! use serde_json::json; //! diff --git a/src/binding/actix/server_request.rs b/src/binding/actix/server_request.rs index b12b345..2f3591f 100644 --- a/src/binding/actix/server_request.rs +++ b/src/binding/actix/server_request.rs @@ -1,14 +1,14 @@ use crate::binding::http::{to_event, Headers}; use crate::Event; +use actix_web::dev::Payload; use actix_web::web::BytesMut; use actix_web::{web, HttpRequest}; use async_trait::async_trait; -use futures::future::LocalBoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::{future::LocalBoxFuture, FutureExt, StreamExt}; use http::header::{AsHeaderName, HeaderName, HeaderValue}; /// Implement Headers for the actix HeaderMap -impl<'a> Headers<'a> for actix_web::http::HeaderMap { +impl<'a> Headers<'a> for actix_http::header::HeaderMap { type Iterator = Box + 'a>; fn get(&self, key: K) -> Option<&HeaderValue> { self.get(key.as_str()) @@ -32,14 +32,18 @@ pub async fn request_to_event( /// So that an actix-web handler may take an Event parameter impl actix_web::FromRequest for Event { - type Config = (); type Error = actix_web::Error; type Future = LocalBoxFuture<'static, std::result::Result>; - fn from_request(r: &HttpRequest, p: &mut actix_web::dev::Payload) -> Self::Future { - let payload = web::Payload(p.take()); + fn from_request(r: &HttpRequest, p: &mut Payload) -> Self::Future { let request = r.to_owned(); - async move { request_to_event(&request, payload).await }.boxed_local() + bytes::Bytes::from_request(&request, p) + .map(move |bytes| match bytes { + Ok(b) => to_event(request.headers(), b.to_vec()) + .map_err(actix_web::error::ErrorBadRequest), + Err(e) => Err(e), + }) + .boxed_local() } } @@ -74,10 +78,18 @@ mod private { #[cfg(test)] mod tests { use super::*; - use actix_web::test; + use actix_web::{test, FromRequest}; use crate::test::fixtures; use serde_json::json; + + async fn to_event(req: &HttpRequest, mut payload: Payload) -> Event { + web::Payload::from_request(&req, &mut payload) + .then(|p| req.to_event(p.unwrap())) + .await + .unwrap() + } + #[actix_rt::test] async fn test_request() { let expected = fixtures::v10::minimal_string_extension(); @@ -90,8 +102,7 @@ mod tests { .insert_header(("ce-someint", "10")) .to_http_parts(); - let resp = req.to_event(web::Payload(payload)).await.unwrap(); - assert_eq!(expected, resp); + assert_eq!(expected, to_event(&req, payload).await); } #[actix_rt::test] @@ -112,8 +123,7 @@ mod tests { .set_json(&fixtures::json_data()) .to_http_parts(); - let resp = req.to_event(web::Payload(payload)).await.unwrap(); - assert_eq!(expected, resp); + assert_eq!(expected, to_event(&req, payload).await); } #[actix_rt::test] @@ -140,7 +150,6 @@ mod tests { .set_payload(bytes) .to_http_parts(); - let resp = req.to_event(web::Payload(payload)).await.unwrap(); - assert_eq!(expected, resp); + assert_eq!(expected, to_event(&req, payload).await); } } diff --git a/src/binding/actix/server_response.rs b/src/binding/actix/server_response.rs index eca06de..9f7a36c 100644 --- a/src/binding/actix/server_response.rs +++ b/src/binding/actix/server_response.rs @@ -27,6 +27,7 @@ pub fn event_to_response + 'static>( /// So that an actix-web handler may return an Event impl actix_web::Responder for Event { + type Body = actix_web::body::BoxBody; fn respond_to(self, _: &HttpRequest) -> HttpResponse { HttpResponse::build(StatusCode::OK).event(self).unwrap() } @@ -135,10 +136,7 @@ mod tests { "10" ); - // let bytes = test::load_stream(resp.take_body().into_stream()) - // .await - // .unwrap(); - let bytes = test::load_body(resp.into_body()).await.unwrap(); - assert_eq!(fixtures::json_data_binary(), bytes.as_ref()) + let sr = test::TestRequest::default().to_srv_response(resp); + assert_eq!(fixtures::json_data_binary(), test::read_body(sr).await); } }