Compatible with actix-web 4.0.0-beta.19
Main challenge here was no longer being able to to construct web::Payload instances from dev::Payload instances. Instead, we now invoke the web::Payload FromRequest impl directly. Also adapted to the latest upstream test and body redesign. Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
parent
a984b638ab
commit
7871ea63eb
|
@ -18,7 +18,7 @@ name = "cloudevents"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
http-binding = ["async-trait", "bytes", "futures", "http"]
|
http-binding = ["async-trait", "bytes", "futures", "http"]
|
||||||
actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
|
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http"]
|
||||||
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
|
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
|
||||||
rdkafka = ["rdkafka-lib", "bytes", "futures"]
|
rdkafka = ["rdkafka-lib", "bytes", "futures"]
|
||||||
warp = ["warp-lib", "bytes", "http", "hyper"]
|
warp = ["warp-lib", "bytes", "http", "hyper"]
|
||||||
|
@ -36,9 +36,8 @@ snafu = "^0.6"
|
||||||
bitflags = "^1.2"
|
bitflags = "^1.2"
|
||||||
|
|
||||||
# runtime optional deps
|
# runtime optional deps
|
||||||
actix-web = { version = "=4.0.0-beta.8", default-features = false, optional = true }
|
actix-web = { version = "=4.0.0-beta.19", default-features = false, optional = true }
|
||||||
actix-http = { version = "=3.0.0-beta.8", default-features = false, optional = true }
|
actix-http = { version = "=3.0.0-beta.18", default-features = false, optional = true }
|
||||||
actix-tls = { version = "=3.0.0-beta.5", default-features = false, optional = true }
|
|
||||||
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
|
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
|
||||||
rdkafka-lib = { version = "^0.28", features = ["cmake-build"], optional = true, package = "rdkafka" }
|
rdkafka-lib = { version = "^0.28", features = ["cmake-build"], optional = true, package = "rdkafka" }
|
||||||
warp-lib = { version = "^0.3", optional = true, package = "warp" }
|
warp-lib = { version = "^0.3", optional = true, package = "warp" }
|
||||||
|
|
|
@ -6,8 +6,8 @@ edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
cloudevents-sdk = { path = "../..", features = ["actix"] }
|
cloudevents-sdk = { path = "../..", features = ["actix"] }
|
||||||
actix-web = "4.0.0-beta.8"
|
actix-web = "4.0.0-beta.19"
|
||||||
actix-cors = "0.6.0-beta.2"
|
actix-cors = "0.6.0-beta.8"
|
||||||
serde_json = "^1.0"
|
serde_json = "^1.0"
|
||||||
url = { version = "^2.1" }
|
url = { version = "^2.1" }
|
||||||
env_logger = "0.7.1"
|
env_logger = "0.7.1"
|
||||||
|
|
|
@ -3,6 +3,19 @@
|
||||||
//! To deserialize an HTTP request as CloudEvent:
|
//! To deserialize an HTTP request as CloudEvent:
|
||||||
//!
|
//!
|
||||||
//! ```
|
//! ```
|
||||||
|
//! use cloudevents::Event;
|
||||||
|
//! use actix_web::post;
|
||||||
|
//!
|
||||||
|
//! #[post("/")]
|
||||||
|
//! async fn post_event(event: Event) -> Result<String, actix_web::Error> {
|
||||||
|
//! println!("Received Event: {:?}", event);
|
||||||
|
//! Ok(format!("{:?}", event))
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! For more complex applications, access the Payload directly:
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
//! use cloudevents::binding::actix::HttpRequestExt;
|
//! use cloudevents::binding::actix::HttpRequestExt;
|
||||||
//! use actix_web::{HttpRequest, web, post};
|
//! use actix_web::{HttpRequest, web, post};
|
||||||
//!
|
//!
|
||||||
|
@ -17,8 +30,30 @@
|
||||||
//! To serialize a CloudEvent to an HTTP response:
|
//! To serialize a CloudEvent to an HTTP response:
|
||||||
//!
|
//!
|
||||||
//! ```
|
//! ```
|
||||||
|
//! use actix_web::get;
|
||||||
|
//! use cloudevents::{Event, EventBuilderV10, EventBuilder};
|
||||||
|
//! use serde_json::json;
|
||||||
|
//!
|
||||||
|
//! #[get("/")]
|
||||||
|
//! async fn get_event() -> Event {
|
||||||
|
//! let payload = json!({"hello": "world"});
|
||||||
|
//!
|
||||||
|
//! EventBuilderV10::new()
|
||||||
|
//! .id("0001")
|
||||||
|
//! .ty("example.test")
|
||||||
|
//! .source("http://localhost/")
|
||||||
|
//! .data("application/json", payload)
|
||||||
|
//! .extension("someint", "10")
|
||||||
|
//! .build()
|
||||||
|
//! .unwrap()
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! For more complex applications, use the HTTP response builder extension:
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
//! use cloudevents::binding::actix::HttpResponseBuilderExt;
|
//! use cloudevents::binding::actix::HttpResponseBuilderExt;
|
||||||
//! use actix_web::{HttpRequest, web, get, HttpResponse};
|
//! use actix_web::{get, HttpResponse};
|
||||||
//! use cloudevents::{EventBuilderV10, EventBuilder};
|
//! use cloudevents::{EventBuilderV10, EventBuilder};
|
||||||
//! use serde_json::json;
|
//! use serde_json::json;
|
||||||
//!
|
//!
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
use crate::binding::http::{to_event, Headers};
|
use crate::binding::http::{to_event, Headers};
|
||||||
use crate::Event;
|
use crate::Event;
|
||||||
|
use actix_web::dev::Payload;
|
||||||
use actix_web::web::BytesMut;
|
use actix_web::web::BytesMut;
|
||||||
use actix_web::{web, HttpRequest};
|
use actix_web::{web, HttpRequest};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::future::LocalBoxFuture;
|
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
|
||||||
use futures::{FutureExt, StreamExt};
|
|
||||||
use http::header::{AsHeaderName, HeaderName, HeaderValue};
|
use http::header::{AsHeaderName, HeaderName, HeaderValue};
|
||||||
|
|
||||||
/// Implement Headers for the actix HeaderMap
|
/// Implement Headers for the actix HeaderMap
|
||||||
impl<'a> Headers<'a> for actix_web::http::HeaderMap {
|
impl<'a> Headers<'a> for actix_http::header::HeaderMap {
|
||||||
type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
|
type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
|
||||||
fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
|
fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
|
||||||
self.get(key.as_str())
|
self.get(key.as_str())
|
||||||
|
@ -32,14 +32,18 @@ pub async fn request_to_event(
|
||||||
|
|
||||||
/// So that an actix-web handler may take an Event parameter
|
/// So that an actix-web handler may take an Event parameter
|
||||||
impl actix_web::FromRequest for Event {
|
impl actix_web::FromRequest for Event {
|
||||||
type Config = ();
|
|
||||||
type Error = actix_web::Error;
|
type Error = actix_web::Error;
|
||||||
type Future = LocalBoxFuture<'static, std::result::Result<Self, Self::Error>>;
|
type Future = LocalBoxFuture<'static, std::result::Result<Self, Self::Error>>;
|
||||||
|
|
||||||
fn from_request(r: &HttpRequest, p: &mut actix_web::dev::Payload) -> Self::Future {
|
fn from_request(r: &HttpRequest, p: &mut Payload) -> Self::Future {
|
||||||
let payload = web::Payload(p.take());
|
|
||||||
let request = r.to_owned();
|
let request = r.to_owned();
|
||||||
async move { request_to_event(&request, payload).await }.boxed_local()
|
bytes::Bytes::from_request(&request, p)
|
||||||
|
.map(move |bytes| match bytes {
|
||||||
|
Ok(b) => to_event(request.headers(), b.to_vec())
|
||||||
|
.map_err(actix_web::error::ErrorBadRequest),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
})
|
||||||
|
.boxed_local()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,10 +78,18 @@ mod private {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use actix_web::test;
|
use actix_web::{test, FromRequest};
|
||||||
|
|
||||||
use crate::test::fixtures;
|
use crate::test::fixtures;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
|
async fn to_event(req: &HttpRequest, mut payload: Payload) -> Event {
|
||||||
|
web::Payload::from_request(&req, &mut payload)
|
||||||
|
.then(|p| req.to_event(p.unwrap()))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_request() {
|
async fn test_request() {
|
||||||
let expected = fixtures::v10::minimal_string_extension();
|
let expected = fixtures::v10::minimal_string_extension();
|
||||||
|
@ -90,8 +102,7 @@ mod tests {
|
||||||
.insert_header(("ce-someint", "10"))
|
.insert_header(("ce-someint", "10"))
|
||||||
.to_http_parts();
|
.to_http_parts();
|
||||||
|
|
||||||
let resp = req.to_event(web::Payload(payload)).await.unwrap();
|
assert_eq!(expected, to_event(&req, payload).await);
|
||||||
assert_eq!(expected, resp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
|
@ -112,8 +123,7 @@ mod tests {
|
||||||
.set_json(&fixtures::json_data())
|
.set_json(&fixtures::json_data())
|
||||||
.to_http_parts();
|
.to_http_parts();
|
||||||
|
|
||||||
let resp = req.to_event(web::Payload(payload)).await.unwrap();
|
assert_eq!(expected, to_event(&req, payload).await);
|
||||||
assert_eq!(expected, resp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
|
@ -140,7 +150,6 @@ mod tests {
|
||||||
.set_payload(bytes)
|
.set_payload(bytes)
|
||||||
.to_http_parts();
|
.to_http_parts();
|
||||||
|
|
||||||
let resp = req.to_event(web::Payload(payload)).await.unwrap();
|
assert_eq!(expected, to_event(&req, payload).await);
|
||||||
assert_eq!(expected, resp);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ pub fn event_to_response<T: Builder<HttpResponse> + 'static>(
|
||||||
|
|
||||||
/// So that an actix-web handler may return an Event
|
/// So that an actix-web handler may return an Event
|
||||||
impl actix_web::Responder for Event {
|
impl actix_web::Responder for Event {
|
||||||
|
type Body = actix_web::body::BoxBody;
|
||||||
fn respond_to(self, _: &HttpRequest) -> HttpResponse {
|
fn respond_to(self, _: &HttpRequest) -> HttpResponse {
|
||||||
HttpResponse::build(StatusCode::OK).event(self).unwrap()
|
HttpResponse::build(StatusCode::OK).event(self).unwrap()
|
||||||
}
|
}
|
||||||
|
@ -135,10 +136,7 @@ mod tests {
|
||||||
"10"
|
"10"
|
||||||
);
|
);
|
||||||
|
|
||||||
// let bytes = test::load_stream(resp.take_body().into_stream())
|
let sr = test::TestRequest::default().to_srv_response(resp);
|
||||||
// .await
|
assert_eq!(fixtures::json_data_binary(), test::read_body(sr).await);
|
||||||
// .unwrap();
|
|
||||||
let bytes = test::load_body(resp.into_body()).await.unwrap();
|
|
||||||
assert_eq!(fixtures::json_data_binary(), bytes.as_ref())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue