From 2e72947bc7a69ee7730e767d84c75305dac9bf0b Mon Sep 17 00:00:00 2001 From: Jim Crossley Date: Thu, 29 Jul 2021 17:41:33 -0400 Subject: [PATCH] Encapsulate shared event deserialization behind a Headers trait Both warp and reqwest use the HeaderMap from the http crate. Actix has its own. Fortunately, both contain (HeaderName, HeaderValue) tuples. Further, actix uses a conflicting version of the bytes crate, so I store a Vec instead of a Bytes member in the Deserializer struct. Not sure if that's a problem, but the tests pass. :) We use an associated type in the Headers trait to facilitate static dispatch for warp/reqwest since their concrete iterator is public, but the actix Iter struct is private, so we use a Box for its impl. We're using AsHeaderName for the get() param to avoid having to call as_str() on any header constants, but of course actix uses its own AsName trait, which isn't public, so we must call as_str() for the passed header name in its impl. Signed-off-by: Jim Crossley --- Cargo.toml | 2 +- src/binding/actix/mod.rs | 1 - src/binding/actix/server_request.rs | 106 +++-------------- .../deserializer.rs} | 57 +++++----- src/binding/http/headers.rs | 21 ++++ src/binding/http/mod.rs | 18 +++ src/binding/reqwest/client_response.rs | 107 +----------------- src/binding/reqwest/mod.rs | 1 - src/binding/warp/filter.rs | 4 +- src/binding/warp/mod.rs | 1 - 10 files changed, 85 insertions(+), 233 deletions(-) rename src/binding/{warp/server_request.rs => http/deserializer.rs} (60%) create mode 100644 src/binding/http/headers.rs diff --git a/Cargo.toml b/Cargo.toml index c3c1a3a..a5f0e9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ name = "cloudevents" [features] actix = ["actix-web", "async-trait", "bytes", "futures", "http"] -reqwest = ["reqwest-lib", "async-trait", "bytes"] +reqwest = ["reqwest-lib", "async-trait", "bytes", "http"] rdkafka = ["rdkafka-lib", "bytes", "futures"] warp = ["warp-lib", "bytes", "http", "hyper"] diff --git a/src/binding/actix/mod.rs b/src/binding/actix/mod.rs index 8e33852..9444f5e 100644 --- a/src/binding/actix/mod.rs +++ b/src/binding/actix/mod.rs @@ -45,7 +45,6 @@ mod server_request; mod server_response; pub use server_request::request_to_event; -pub use server_request::HttpRequestDeserializer; pub use server_request::HttpRequestExt; pub use server_response::event_to_response; pub use server_response::HttpResponseBuilderExt; diff --git a/src/binding/actix/server_request.rs b/src/binding/actix/server_request.rs index 67fc023..27d3130 100644 --- a/src/binding/actix/server_request.rs +++ b/src/binding/actix/server_request.rs @@ -1,99 +1,20 @@ -use crate::binding::http::SPEC_VERSION_HEADER; -use crate::event::SpecVersion; -use crate::message::{ - BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, - Result, StructuredDeserializer, StructuredSerializer, -}; -use crate::{header_value_to_str, message, Event}; -use actix_web::web::{Bytes, BytesMut}; -use actix_web::{web, HttpMessage, HttpRequest}; +use crate::binding::http::{to_event, Headers}; +use crate::Event; +use actix_web::web::BytesMut; +use actix_web::{web, HttpRequest}; use async_trait::async_trait; use futures::future::LocalBoxFuture; use futures::{FutureExt, StreamExt}; -use std::convert::TryFrom; +use http::header::{AsHeaderName, HeaderName, HeaderValue}; -/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait. -pub struct HttpRequestDeserializer<'a> { - req: &'a HttpRequest, - body: Bytes, -} - -impl HttpRequestDeserializer<'_> { - pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer { - HttpRequestDeserializer { req, body } +/// Implement Headers for the actix HeaderMap +impl<'a> Headers<'a> for actix_web::http::HeaderMap { + type Iterator = Box + 'a>; + fn get(&self, key: K) -> Option<&HeaderValue> { + self.get(key.as_str()) } -} - -impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> { - fn deserialize_binary>(self, mut visitor: V) -> Result { - if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}); - } - - let spec_version = SpecVersion::try_from( - self.req - .headers() - .get(SPEC_VERSION_HEADER) - .map(|a| header_value_to_str!(a)) - .unwrap()?, - )?; - - visitor = visitor.set_spec_version(spec_version.clone())?; - - let attributes = spec_version.attribute_names(); - - for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| { - let key = hn.as_str(); - SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-") - }) { - let name = &hn.as_str()["ce-".len()..]; - - if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - } - - if let Some(hv) = self.req.headers().get("content-type") { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - - if !self.body.is_empty() { - visitor.end_with_data(self.body.to_vec()) - } else { - visitor.end() - } - } -} - -impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> { - fn deserialize_structured>(self, visitor: V) -> Result { - if self.encoding() != Encoding::STRUCTURED { - return Err(message::Error::WrongEncoding {}); - } - visitor.set_structured_event(self.body.to_vec()) - } -} - -impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> { - fn encoding(&self) -> Encoding { - if self.req.content_type() == "application/cloudevents+json" { - Encoding::STRUCTURED - } else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() { - Encoding::BINARY - } else { - Encoding::UNKNOWN - } + fn iter(&'a self) -> Self::Iterator { + Box::new(self.iter()) } } @@ -106,8 +27,7 @@ pub async fn request_to_event( while let Some(item) = payload.next().await { bytes.extend_from_slice(&item?); } - MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze())) - .map_err(actix_web::error::ErrorBadRequest) + to_event(req.headers(), bytes.to_vec()).map_err(actix_web::error::ErrorBadRequest) } /// So that an actix-web handler may take an Event parameter diff --git a/src/binding/warp/server_request.rs b/src/binding/http/deserializer.rs similarity index 60% rename from src/binding/warp/server_request.rs rename to src/binding/http/deserializer.rs index 694eca7..0b05da1 100644 --- a/src/binding/warp/server_request.rs +++ b/src/binding/http/deserializer.rs @@ -1,28 +1,27 @@ -use bytes::Bytes; -use http::HeaderMap; - -use crate::binding::http::SPEC_VERSION_HEADER; -use crate::event::SpecVersion; -use crate::header_value_to_str; -use crate::message::{ - BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, - MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, +use super::{Headers, SPEC_VERSION_HEADER}; +use crate::{ + binding::CLOUDEVENTS_JSON_HEADER, + event::SpecVersion, + header_value_to_str, message, + message::{ + BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, + Result, StructuredDeserializer, StructuredSerializer, + }, }; -use crate::{message, Event}; use std::convert::TryFrom; -pub struct RequestDeserializer { - headers: HeaderMap, - body: Bytes, +pub struct Deserializer<'a, T: Headers<'a>> { + headers: &'a T, + body: Vec, } -impl RequestDeserializer { - pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer { - RequestDeserializer { headers, body } +impl<'a, T: Headers<'a>> Deserializer<'a, T> { + pub fn new(headers: &'a T, body: Vec) -> Deserializer<'a, T> { + Deserializer { headers, body } } } -impl BinaryDeserializer for RequestDeserializer { +impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> { fn deserialize_binary>(self, mut visitor: V) -> Result { if self.encoding() != Encoding::BINARY { return Err(message::Error::WrongEncoding {}); @@ -58,7 +57,7 @@ impl BinaryDeserializer for RequestDeserializer { } } - if let Some(hv) = self.headers.get("content-type") { + if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) { visitor = visitor.set_attribute( "datacontenttype", MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), @@ -66,40 +65,36 @@ impl BinaryDeserializer for RequestDeserializer { } if !self.body.is_empty() { - visitor.end_with_data(self.body.to_vec()) + visitor.end_with_data(self.body) } else { visitor.end() } } } -impl StructuredDeserializer for RequestDeserializer { +impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> { fn deserialize_structured>(self, visitor: V) -> Result { if self.encoding() != Encoding::STRUCTURED { return Err(message::Error::WrongEncoding {}); } - visitor.set_structured_event(self.body.to_vec()) + visitor.set_structured_event(self.body) } } -impl MessageDeserializer for RequestDeserializer { +impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> { fn encoding(&self) -> Encoding { if self .headers - .get("content-type") - .map(|v| v.to_str().unwrap_or("")) - .unwrap_or("") - == "application/cloudevents+json" + .get(http::header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER)) + .is_some() { Encoding::STRUCTURED - } else if self.headers.contains_key(SPEC_VERSION_HEADER) { + } else if self.headers.get(SPEC_VERSION_HEADER).is_some() { Encoding::BINARY } else { Encoding::UNKNOWN } } } - -pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result { - MessageDeserializer::into_event(RequestDeserializer::new(req, bytes)) -} diff --git a/src/binding/http/headers.rs b/src/binding/http/headers.rs new file mode 100644 index 0000000..a129d87 --- /dev/null +++ b/src/binding/http/headers.rs @@ -0,0 +1,21 @@ +use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue}; + +/// Any http library should be able to use the +/// [`to_event`](super::to_event) function with an implementation of +/// this trait. +pub trait Headers<'a> { + type Iterator: Iterator; + fn get(&self, name: K) -> Option<&HeaderValue>; + fn iter(&'a self) -> Self::Iterator; +} + +/// Implemention for the HeaderMap used by warp/reqwest +impl<'a> Headers<'a> for HeaderMap { + type Iterator = http::header::Iter<'a, HeaderValue>; + fn get(&self, name: K) -> Option<&HeaderValue> { + self.get(name) + } + fn iter(&'a self) -> Self::Iterator { + self.iter() + } +} diff --git a/src/binding/http/mod.rs b/src/binding/http/mod.rs index bfe4338..08cb927 100644 --- a/src/binding/http/mod.rs +++ b/src/binding/http/mod.rs @@ -1,5 +1,23 @@ +mod deserializer; +mod headers; + +use crate::{ + message::{Error, MessageDeserializer}, + Event, +}; +use deserializer::Deserializer; +pub use headers::Headers; + pub static SPEC_VERSION_HEADER: &str = "ce-specversion"; +/// Turn a pile of HTTP headers and a body into a CloudEvent +pub fn to_event<'a, T: Headers<'a>>( + headers: &'a T, + body: Vec, +) -> std::result::Result { + MessageDeserializer::into_event(Deserializer::new(headers, body)) +} + pub fn header_prefix(name: &str) -> String { super::header_prefix("ce-", name) } diff --git a/src/binding/reqwest/client_response.rs b/src/binding/reqwest/client_response.rs index c0ad9bf..e139d37 100644 --- a/src/binding/reqwest/client_response.rs +++ b/src/binding/reqwest/client_response.rs @@ -1,108 +1,10 @@ use reqwest_lib as reqwest; -use crate::binding::http::SPEC_VERSION_HEADER; -use crate::event::SpecVersion; -use crate::message::{ - BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, - MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, -}; -use crate::{header_value_to_str, message, Event}; +use crate::binding::http; +use crate::message::{Error, Result}; +use crate::Event; use async_trait::async_trait; -use bytes::Bytes; -use reqwest::header::HeaderMap; use reqwest::Response; -use std::convert::TryFrom; - -/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait. -pub struct ResponseDeserializer { - headers: HeaderMap, - body: Bytes, -} - -impl ResponseDeserializer { - pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer { - ResponseDeserializer { headers, body } - } -} - -impl BinaryDeserializer for ResponseDeserializer { - fn deserialize_binary>(self, mut visitor: V) -> Result { - if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}); - } - - let spec_version = SpecVersion::try_from( - self.headers - .get(SPEC_VERSION_HEADER) - .map(|a| header_value_to_str!(a)) - .unwrap()?, - )?; - - visitor = visitor.set_spec_version(spec_version.clone())?; - - let attributes = spec_version.attribute_names(); - - for (hn, hv) in self.headers.iter().filter(|(hn, _)| { - let key = hn.as_str(); - SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-") - }) { - let name = &hn.as_str()["ce-".len()..]; - - if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - } - - if let Some(hv) = self.headers.get("content-type") { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - - if !self.body.is_empty() { - visitor.end_with_data(self.body.to_vec()) - } else { - visitor.end() - } - } -} - -impl StructuredDeserializer for ResponseDeserializer { - fn deserialize_structured>(self, visitor: V) -> Result { - if self.encoding() != Encoding::STRUCTURED { - return Err(message::Error::WrongEncoding {}); - } - visitor.set_structured_event(self.body.to_vec()) - } -} - -impl MessageDeserializer for ResponseDeserializer { - fn encoding(&self) -> Encoding { - match ( - #[allow(clippy::borrow_interior_mutable_const)] - self.headers - .get(reqwest::header::CONTENT_TYPE) - .map(|a| header_value_to_str!(a)) - .map(|r| r.ok()) - .flatten() - .map(|e| e.starts_with("application/cloudevents+json")), - self.headers.get(SPEC_VERSION_HEADER), - ) { - (Some(true), _) => Encoding::STRUCTURED, - (_, Some(_)) => Encoding::BINARY, - _ => Encoding::UNKNOWN, - } - } -} /// Method to transform an incoming [`Response`] to [`Event`]. pub async fn response_to_event(res: Response) -> Result { @@ -110,8 +12,7 @@ pub async fn response_to_event(res: Response) -> Result { let b = res.bytes().await.map_err(|e| Error::Other { source: Box::new(e), })?; - - MessageDeserializer::into_event(ResponseDeserializer::new(h, b)) + http::to_event(&h, b.to_vec()) } /// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`]. diff --git a/src/binding/reqwest/mod.rs b/src/binding/reqwest/mod.rs index aad3bac..7cefe49 100644 --- a/src/binding/reqwest/mod.rs +++ b/src/binding/reqwest/mod.rs @@ -37,5 +37,4 @@ pub use client_request::event_to_request; pub use client_request::RequestBuilderExt; pub use client_request::RequestSerializer; pub use client_response::response_to_event; -pub use client_response::ResponseDeserializer; pub use client_response::ResponseExt; diff --git a/src/binding/warp/filter.rs b/src/binding/warp/filter.rs index b7546de..2a4490b 100644 --- a/src/binding/warp/filter.rs +++ b/src/binding/warp/filter.rs @@ -1,6 +1,6 @@ use warp_lib as warp; -use super::server_request::request_to_event; +use crate::binding::http; use crate::Event; use warp::http::HeaderMap; @@ -38,7 +38,7 @@ pub fn to_event() -> impl Filter + Copy { } async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result { - request_to_event(headers, body) + http::to_event(&headers, body.to_vec()) .map_err(|error| warp::reject::custom(EventFilterError { error })) } diff --git a/src/binding/warp/mod.rs b/src/binding/warp/mod.rs index 5e9708c..3c6d452 100644 --- a/src/binding/warp/mod.rs +++ b/src/binding/warp/mod.rs @@ -61,7 +61,6 @@ //! warp::serve(routes).run(([127, 0, 0, 1], 3030)); //! ``` -mod server_request; mod server_response; pub mod filter;