From 538b6478045b9ffe05ac11d194bef5f68798c873 Mon Sep 17 00:00:00 2001 From: Jim Crossley Date: Thu, 17 Jun 2021 19:34:49 -0400 Subject: [PATCH] New feature: cloudevents-warp Conditionally compile warp module when enabled Signed-off-by: Jim Crossley --- Cargo.toml | 5 + example-projects/warp-example/Cargo.toml | 3 +- example-projects/warp-example/src/main.rs | 2 +- src/lib.rs | 2 + src/warp/filter.rs | 134 ++++++++++++++++++++++ src/warp/headers.rs | 61 ++++++++++ src/warp/mod.rs | 70 +++++++++++ src/warp/reply.rs | 126 ++++++++++++++++++++ src/warp/server_request.rs | 107 +++++++++++++++++ src/warp/server_response.rs | 102 ++++++++++++++++ 10 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 src/warp/filter.rs create mode 100644 src/warp/headers.rs create mode 100644 src/warp/mod.rs create mode 100644 src/warp/reply.rs create mode 100644 src/warp/server_request.rs create mode 100644 src/warp/server_response.rs diff --git a/Cargo.toml b/Cargo.toml index c5a36eb..510e047 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ name = "cloudevents" cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"] cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"] cloudevents-rdkafka = ["rdkafka", "lazy_static", "bytes"] +cloudevents-warp = ["warp", "lazy_static", "bytes", "http", "hyper"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -35,10 +36,13 @@ bitflags = "^1.2" actix-web = { version = "^3", default-features = false, optional = true } reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true } rdkafka = { version = "^0.25", features = ["cmake-build"], optional = true } +warp = { version = "^0.3", optional = true } async-trait = { version = "^0.1.33", optional = true } lazy_static = { version = "1.4.0", optional = true } bytes = { version = "^1.0", optional = true } futures = { version = "^0.3", optional = true } +http = { version = "0.2", optional = true } +hyper = { version = "^0.14", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -61,6 +65,7 @@ serde_json = { version = "^1.0" } chrono = { version = "^0.4", features = ["serde"] } mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } +mime = "0.3" [workspace] members = [ diff --git a/example-projects/warp-example/Cargo.toml b/example-projects/warp-example/Cargo.toml index ab6f86c..131ce92 100644 --- a/example-projects/warp-example/Cargo.toml +++ b/example-projects/warp-example/Cargo.toml @@ -7,8 +7,7 @@ categories = ["web-programming", "encoding"] license-file = "../LICENSE" [dependencies] -cloudevents-sdk = { path = "../.." } -cloudevents-sdk-warp = { path = "../../cloudevents-sdk-warp"} +cloudevents-sdk = { path = "../..", features = ["cloudevents-warp"] } warp = "^0.3" tokio = { version = "^1.0", features = ["full"] } diff --git a/example-projects/warp-example/src/main.rs b/example-projects/warp-example/src/main.rs index a5db9af..7b7ec45 100644 --- a/example-projects/warp-example/src/main.rs +++ b/example-projects/warp-example/src/main.rs @@ -1,4 +1,4 @@ -use cloudevents_sdk_warp::{filter, reply}; +use cloudevents::warp::{filter, reply}; use warp::Filter; #[tokio::main] diff --git a/src/lib.rs b/src/lib.rs index 742d665..8f4018a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,8 @@ pub mod actix; pub mod rdkafka; #[cfg(feature = "cloudevents-reqwest")] pub mod reqwest; +#[cfg(feature = "cloudevents-warp")] +pub mod warp; pub mod event; pub mod message; diff --git a/src/warp/filter.rs b/src/warp/filter.rs new file mode 100644 index 0000000..965ea86 --- /dev/null +++ b/src/warp/filter.rs @@ -0,0 +1,134 @@ +use super::server_request::request_to_event; + +use crate::Event; +use warp::http::HeaderMap; +use warp::Filter; +use warp::Rejection; + +#[derive(Debug)] +pub struct EventFilterError { + error: crate::message::Error, +} + +impl warp::reject::Reject for EventFilterError {} + +/// +/// # Extracts [`cloudevents::Event`] from incoming request +/// +/// ``` +/// use cloudevents::warp::filter::to_event; +/// use warp::Filter; +/// use warp::Reply; +/// +/// let routes = warp::any() +/// .and(to_event()) +/// .map(|event| { +/// // do something with the event +/// } +/// ); +/// ``` +/// +pub fn to_event() -> impl Filter + Copy { + warp::header::headers_cloned() + .and(warp::body::bytes()) + .and_then(create_event) +} + +async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result { + request_to_event(headers, body) + .map_err(|error| warp::reject::custom(EventFilterError { error })) +} + +#[cfg(test)] +mod tests { + use super::to_event; + use warp::test; + + use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; + use serde_json::json; + + #[tokio::test] + async fn test_request() { + let time = Utc::now(); + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .time(time) + .extension("someint", "10") + .build() + .unwrap(); + + let result = test::request() + .method("POST") + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .filter(&to_event()) + .await + .unwrap(); + + assert_eq!(expected, result); + } + + #[tokio::test] + async fn test_bad_request() { + let time = Utc::now(); + + let result = test::request() + .method("POST") + .header("ce-specversion", "BAD SPECIFICATION") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .filter(&to_event()) + .await; + + assert!(result.is_err()); + let rejection = result.unwrap_err(); + + let reason = rejection.find::().unwrap(); + assert_eq!( + reason.error.to_string(), + "Invalid specversion BAD SPECIFICATION" + ) + } + + #[tokio::test] + async fn test_request_with_full_data() { + let time = Utc::now(); + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .time(time) + .data("application/json", j.to_string().into_bytes()) + .extension("someint", "10") + .build() + .unwrap(); + + let result = test::request() + .method("POST") + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .header("content-type", "application/json") + .json(&j) + .filter(&to_event()) + .await + .unwrap(); + + assert_eq!(expected, result); + } +} diff --git a/src/warp/headers.rs b/src/warp/headers.rs new file mode 100644 index 0000000..ea5c998 --- /dev/null +++ b/src/warp/headers.rs @@ -0,0 +1,61 @@ +use crate::event::SpecVersion; +use http::header::HeaderName; +use lazy_static::lazy_static; +use warp::http::HeaderValue; + +use std::collections::HashMap; +use std::str::FromStr; + +macro_rules! unwrap_optional_header { + ($headers:expr, $name:expr) => { + $headers + .get::<&'static HeaderName>(&$name) + .map(|a| header_value_to_str!(a)) + }; +} + +macro_rules! header_value_to_str { + ($header_value:expr) => { + $header_value + .to_str() + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! str_name_to_header { + ($attribute:expr) => { + HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + str_name_to_header!(&["ce-", $attribute].concat()) + }; +} + +fn attributes_to_headers( + it: impl Iterator, +) -> HashMap<&'static str, HeaderName> { + it.map(|s| { + if s == "datacontenttype" { + (s, http::header::CONTENT_TYPE) + } else { + (s, attribute_name_to_header!(s).unwrap()) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = + attributes_to_headers(SpecVersion::all_attribute_names()); + pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = + HeaderName::from_static("ce-specversion"); + pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = + HeaderValue::from_static("application/cloudevents+json"); +} diff --git a/src/warp/mod.rs b/src/warp/mod.rs new file mode 100644 index 0000000..93691f8 --- /dev/null +++ b/src/warp/mod.rs @@ -0,0 +1,70 @@ +//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/) +//! to easily send and receive CloudEvents. +//! +//! To deserialize an HTTP request as CloudEvent +//! +//! To echo events: +//! +//! ``` +//! use warp::{Filter, Reply}; +//! use cloudevents::warp::reply::from_event; +//! use cloudevents::warp::filter::to_event; +//! +//! let routes = warp::any() +//! // extracting event from request +//! .and(to_event()) +//! // returning event back +//! .map(|event| from_event(event)); +//! +//! warp::serve(routes).run(([127, 0, 0, 1], 3030)); +//! ``` +//! +//! To create event inside request handlers and send them as responses: +//! +//! ``` +//! use cloudevents::{Event, EventBuilder, EventBuilderV10}; +//! use http::StatusCode; +//! use serde_json::json; +//! use warp::{Filter, Reply}; +//! use cloudevents::warp::reply::from_event; +//! +//! let routes = warp::any().map(|| { +//! let event = EventBuilderV10::new() +//! .id("1") +//! .source("url://example_response/") +//! .ty("example.ce") +//! .data( +//! mime::APPLICATION_JSON.to_string(), +//! json!({ +//! "name": "John Doe", +//! "age": 43, +//! "phones": [ +//! "+44 1234567", +//! "+44 2345678" +//! ] +//! }), +//! ) +//! .build(); +//! +//! match event { +//! Ok(event) => Ok(from_event(event)), +//! Err(e) => Ok(warp::reply::with_status( +//! e.to_string(), +//! StatusCode::INTERNAL_SERVER_ERROR, +//! ) +//! .into_response()), +//! } +//! }); +//! warp::serve(routes).run(([127, 0, 0, 1], 3030)); +//! ``` +//! +//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`] + +#[macro_use] +mod headers; + +mod server_request; +mod server_response; + +pub mod filter; +pub mod reply; diff --git a/src/warp/reply.rs b/src/warp/reply.rs new file mode 100644 index 0000000..9103ba6 --- /dev/null +++ b/src/warp/reply.rs @@ -0,0 +1,126 @@ +use super::server_response::event_to_response; + +use crate::Event; +use http::StatusCode; +use warp::reply::Response; + +/// +/// # Serializes [`cloudevents::Event`] as a http response +/// +/// ``` +/// use cloudevents::warp::reply::from_event; +/// use cloudevents::Event; +/// use warp::Filter; +/// use warp::Reply; +/// +/// let routes = warp::any() +/// .map(|| from_event(Event::default())); +/// ``` +pub fn from_event(event: Event) -> Response { + match event_to_response(event) { + Ok(response) => response, + Err(e) => warp::http::response::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(hyper::body::Body::from(e.to_string())) + .unwrap(), + } +} + +#[cfg(test)] +mod tests { + + use crate::{EventBuilder, EventBuilderV10}; + use serde_json::json; + + #[test] + fn test_response() { + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .extension("someint", "10") + .build() + .unwrap(); + + let resp = super::from_event(input); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + } + + #[tokio::test] + async fn test_response_with_full_data() { + let j = json!({"hello": "world"}); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let resp = super::from_event(input); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + + let (_, body) = resp.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + + assert_eq!(j.to_string().as_bytes(), body); + } +} diff --git a/src/warp/server_request.rs b/src/warp/server_request.rs new file mode 100644 index 0000000..c1c7a47 --- /dev/null +++ b/src/warp/server_request.rs @@ -0,0 +1,107 @@ +use super::headers; +use bytes::Bytes; +use http::{header::HeaderName, HeaderMap}; + +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, + MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, +}; + +use crate::{message, Event}; +use std::convert::TryFrom; + +pub struct RequestDeserializer { + headers: HeaderMap, + body: Bytes, +} + +impl RequestDeserializer { + pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer { + RequestDeserializer { headers, body } + } +} + +impl BinaryDeserializer for RequestDeserializer { + fn deserialize_binary>(self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + for (hn, hv) in self + .headers + .iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) + { + let name = &hn.as_str()["ce-".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + } + + if let Some(hv) = self.headers.get("content-type") { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + + if self.body.len() != 0 { + visitor.end_with_data(self.body.to_vec()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for RequestDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + if self.encoding() != Encoding::STRUCTURED { + return Err(message::Error::WrongEncoding {}); + } + visitor.set_structured_event(self.body.to_vec()) + } +} + +impl MessageDeserializer for RequestDeserializer { + fn encoding(&self) -> Encoding { + if self + .headers + .get("content-type") + .map(|v| v.to_str().unwrap_or("")) + .unwrap_or("") + == "application/cloudevents+json" + { + Encoding::STRUCTURED + } else if self + .headers + .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) + .is_some() + { + Encoding::BINARY + } else { + Encoding::UNKNOWN + } + } +} + +pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result { + MessageDeserializer::into_event(RequestDeserializer::new(req, bytes)) +} diff --git a/src/warp/server_response.rs b/src/warp/server_response.rs new file mode 100644 index 0000000..edcfd7f --- /dev/null +++ b/src/warp/server_response.rs @@ -0,0 +1,102 @@ +use super::headers; + +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, + StructuredSerializer, +}; +use crate::Event; + +use warp::http::HeaderValue; +use warp::hyper::Body; +use warp::reply::Response; + +use http::header::HeaderName; +use http::response::Builder; + +use std::{convert::TryFrom, str::FromStr}; + +pub struct ResponseSerializer { + builder: Builder, +} + +impl ResponseSerializer { + fn new() -> Self { + ResponseSerializer { + builder: http::Response::builder(), + } + } +} + +impl BinarySerializer for ResponseSerializer { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + self.builder = self.builder.header( + headers::SPEC_VERSION_HEADER.clone(), + HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?, + ); + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.builder = self.builder.header( + headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), + HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?, + ); + Ok(self) + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.builder = self.builder.header( + attribute_name_to_header!(name)?, + HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?, + ); + Ok(self) + } + + fn end_with_data(self, bytes: Vec) -> Result { + self.builder + .body(Body::from(bytes)) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + } + + fn end(self) -> Result { + self.builder + .body(Body::empty()) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + } +} + +impl StructuredSerializer for ResponseSerializer { + fn set_structured_event(self, bytes: Vec) -> Result { + Ok(self + .builder + .header( + http::header::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER.clone(), + ) + .body(Body::from(bytes)) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + })?) + } +} + +pub fn event_to_response(event: Event) -> std::result::Result { + BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new()) +}