New feature: cloudevents-warp

Conditionally compile warp module when enabled

Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
Jim Crossley 2021-06-17 19:34:49 -04:00
parent 9055d71fb2
commit 538b647804
10 changed files with 609 additions and 3 deletions

View File

@ -20,6 +20,7 @@ name = "cloudevents"
cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"]
cloudevents-rdkafka = ["rdkafka", "lazy_static", "bytes"]
cloudevents-warp = ["warp", "lazy_static", "bytes", "http", "hyper"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
@ -35,10 +36,13 @@ bitflags = "^1.2"
actix-web = { version = "^3", default-features = false, optional = true }
reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true }
rdkafka = { version = "^0.25", features = ["cmake-build"], optional = true }
warp = { version = "^0.3", optional = true }
async-trait = { version = "^0.1.33", optional = true }
lazy_static = { version = "1.4.0", optional = true }
bytes = { version = "^1.0", optional = true }
futures = { version = "^0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "^0.14", optional = true }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"
@ -61,6 +65,7 @@ serde_json = { version = "^1.0" }
chrono = { version = "^0.4", features = ["serde"] }
mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
mime = "0.3"
[workspace]
members = [

View File

@ -7,8 +7,7 @@ categories = ["web-programming", "encoding"]
license-file = "../LICENSE"
[dependencies]
cloudevents-sdk = { path = "../.." }
cloudevents-sdk-warp = { path = "../../cloudevents-sdk-warp"}
cloudevents-sdk = { path = "../..", features = ["cloudevents-warp"] }
warp = "^0.3"
tokio = { version = "^1.0", features = ["full"] }

View File

@ -1,4 +1,4 @@
use cloudevents_sdk_warp::{filter, reply};
use cloudevents::warp::{filter, reply};
use warp::Filter;
#[tokio::main]

View File

@ -46,6 +46,8 @@ pub mod actix;
pub mod rdkafka;
#[cfg(feature = "cloudevents-reqwest")]
pub mod reqwest;
#[cfg(feature = "cloudevents-warp")]
pub mod warp;
pub mod event;
pub mod message;

134
src/warp/filter.rs Normal file
View File

@ -0,0 +1,134 @@
use super::server_request::request_to_event;
use crate::Event;
use warp::http::HeaderMap;
use warp::Filter;
use warp::Rejection;
#[derive(Debug)]
pub struct EventFilterError {
error: crate::message::Error,
}
impl warp::reject::Reject for EventFilterError {}
///
/// # Extracts [`cloudevents::Event`] from incoming request
///
/// ```
/// use cloudevents::warp::filter::to_event;
/// use warp::Filter;
/// use warp::Reply;
///
/// let routes = warp::any()
/// .and(to_event())
/// .map(|event| {
/// // do something with the event
/// }
/// );
/// ```
///
pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
warp::header::headers_cloned()
.and(warp::body::bytes())
.and_then(create_event)
}
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
request_to_event(headers, body)
.map_err(|error| warp::reject::custom(EventFilterError { error }))
}
#[cfg(test)]
mod tests {
use super::to_event;
use warp::test;
use crate::{EventBuilder, EventBuilderV10};
use chrono::Utc;
use serde_json::json;
#[tokio::test]
async fn test_request() {
let time = Utc::now();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.time(time)
.extension("someint", "10")
.build()
.unwrap();
let result = test::request()
.method("POST")
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.filter(&to_event())
.await
.unwrap();
assert_eq!(expected, result);
}
#[tokio::test]
async fn test_bad_request() {
let time = Utc::now();
let result = test::request()
.method("POST")
.header("ce-specversion", "BAD SPECIFICATION")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.filter(&to_event())
.await;
assert!(result.is_err());
let rejection = result.unwrap_err();
let reason = rejection.find::<super::EventFilterError>().unwrap();
assert_eq!(
reason.error.to_string(),
"Invalid specversion BAD SPECIFICATION"
)
}
#[tokio::test]
async fn test_request_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.time(time)
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let result = test::request()
.method("POST")
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.json(&j)
.filter(&to_event())
.await
.unwrap();
assert_eq!(expected, result);
}
}

61
src/warp/headers.rs Normal file
View File

@ -0,0 +1,61 @@
use crate::event::SpecVersion;
use http::header::HeaderName;
use lazy_static::lazy_static;
use warp::http::HeaderValue;
use std::collections::HashMap;
use std::str::FromStr;
macro_rules! unwrap_optional_header {
($headers:expr, $name:expr) => {
$headers
.get::<&'static HeaderName>(&$name)
.map(|a| header_value_to_str!(a))
};
}
macro_rules! header_value_to_str {
($header_value:expr) => {
$header_value
.to_str()
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_name_to_header {
($attribute:expr) => {
HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! attribute_name_to_header {
($attribute:expr) => {
str_name_to_header!(&["ce-", $attribute].concat())
};
}
fn attributes_to_headers(
it: impl Iterator<Item = &'static str>,
) -> HashMap<&'static str, HeaderName> {
it.map(|s| {
if s == "datacontenttype" {
(s, http::header::CONTENT_TYPE)
} else {
(s, attribute_name_to_header!(s).unwrap())
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
attributes_to_headers(SpecVersion::all_attribute_names());
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
HeaderName::from_static("ce-specversion");
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
HeaderValue::from_static("application/cloudevents+json");
}

70
src/warp/mod.rs Normal file
View File

@ -0,0 +1,70 @@
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/)
//! to easily send and receive CloudEvents.
//!
//! To deserialize an HTTP request as CloudEvent
//!
//! To echo events:
//!
//! ```
//! use warp::{Filter, Reply};
//! use cloudevents::warp::reply::from_event;
//! use cloudevents::warp::filter::to_event;
//!
//! let routes = warp::any()
//! // extracting event from request
//! .and(to_event())
//! // returning event back
//! .map(|event| from_event(event));
//!
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
//! ```
//!
//! To create event inside request handlers and send them as responses:
//!
//! ```
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
//! use http::StatusCode;
//! use serde_json::json;
//! use warp::{Filter, Reply};
//! use cloudevents::warp::reply::from_event;
//!
//! let routes = warp::any().map(|| {
//! let event = EventBuilderV10::new()
//! .id("1")
//! .source("url://example_response/")
//! .ty("example.ce")
//! .data(
//! mime::APPLICATION_JSON.to_string(),
//! json!({
//! "name": "John Doe",
//! "age": 43,
//! "phones": [
//! "+44 1234567",
//! "+44 2345678"
//! ]
//! }),
//! )
//! .build();
//!
//! match event {
//! Ok(event) => Ok(from_event(event)),
//! Err(e) => Ok(warp::reply::with_status(
//! e.to_string(),
//! StatusCode::INTERNAL_SERVER_ERROR,
//! )
//! .into_response()),
//! }
//! });
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
//! ```
//!
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]
#[macro_use]
mod headers;
mod server_request;
mod server_response;
pub mod filter;
pub mod reply;

126
src/warp/reply.rs Normal file
View File

@ -0,0 +1,126 @@
use super::server_response::event_to_response;
use crate::Event;
use http::StatusCode;
use warp::reply::Response;
///
/// # Serializes [`cloudevents::Event`] as a http response
///
/// ```
/// use cloudevents::warp::reply::from_event;
/// use cloudevents::Event;
/// use warp::Filter;
/// use warp::Reply;
///
/// let routes = warp::any()
/// .map(|| from_event(Event::default()));
/// ```
pub fn from_event(event: Event) -> Response {
match event_to_response(event) {
Ok(response) => response,
Err(e) => warp::http::response::Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::body::Body::from(e.to_string()))
.unwrap(),
}
}
#[cfg(test)]
mod tests {
use crate::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[test]
fn test_response() {
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let resp = super::from_event(input);
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
}
#[tokio::test]
async fn test_response_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let resp = super::from_event(input);
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let (_, body) = resp.into_parts();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(j.to_string().as_bytes(), body);
}
}

107
src/warp/server_request.rs Normal file
View File

@ -0,0 +1,107 @@
use super::headers;
use bytes::Bytes;
use http::{header::HeaderName, HeaderMap};
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
};
use crate::{message, Event};
use std::convert::TryFrom;
pub struct RequestDeserializer {
headers: HeaderMap,
body: Bytes,
}
impl RequestDeserializer {
pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer {
RequestDeserializer { headers, body }
}
}
impl BinaryDeserializer for RequestDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
for (hn, hv) in self
.headers
.iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
{
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.headers.get("content-type") {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if self.body.len() != 0 {
visitor.end_with_data(self.body.to_vec())
} else {
visitor.end()
}
}
}
impl StructuredDeserializer for RequestDeserializer {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body.to_vec())
}
}
impl MessageDeserializer for RequestDeserializer {
fn encoding(&self) -> Encoding {
if self
.headers
.get("content-type")
.map(|v| v.to_str().unwrap_or(""))
.unwrap_or("")
== "application/cloudevents+json"
{
Encoding::STRUCTURED
} else if self
.headers
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
.is_some()
{
Encoding::BINARY
} else {
Encoding::UNKNOWN
}
}
}
pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result<Event, Error> {
MessageDeserializer::into_event(RequestDeserializer::new(req, bytes))
}

102
src/warp/server_response.rs Normal file
View File

@ -0,0 +1,102 @@
use super::headers;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
StructuredSerializer,
};
use crate::Event;
use warp::http::HeaderValue;
use warp::hyper::Body;
use warp::reply::Response;
use http::header::HeaderName;
use http::response::Builder;
use std::{convert::TryFrom, str::FromStr};
pub struct ResponseSerializer {
builder: Builder,
}
impl ResponseSerializer {
fn new() -> Self {
ResponseSerializer {
builder: http::Response::builder(),
}
}
}
impl BinarySerializer<Response> for ResponseSerializer {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.builder = self.builder.header(
headers::SPEC_VERSION_HEADER.clone(),
HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
}
})?,
);
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder = self.builder.header(
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
}
})?,
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder = self.builder.header(
attribute_name_to_header!(name)?,
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
}
})?,
);
Ok(self)
}
fn end_with_data(self, bytes: Vec<u8>) -> Result<Response> {
self.builder
.body(Body::from(bytes))
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
}
fn end(self) -> Result<Response> {
self.builder
.body(Body::empty())
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
}
}
impl StructuredSerializer<Response> for ResponseSerializer {
fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
Ok(self
.builder
.header(
http::header::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER.clone(),
)
.body(Body::from(bytes))
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})?)
}
}
pub fn event_to_response(event: Event) -> std::result::Result<Response, Error> {
BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new())
}