New feature: cloudevents-actix

Conditionally compile actix module when enabled

Mostly straightforward, though I don't particularly love that dev-deps
can't be optional: https://github.com/rust-lang/cargo/issues/1596

Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
Jim Crossley 2021-06-17 18:14:22 -04:00
parent 303ed92545
commit 935234a9cb
11 changed files with 575 additions and 8 deletions

View File

@ -16,6 +16,9 @@ categories = ["web-programming", "encoding", "data-structures"]
[lib]
name = "cloudevents"
[features]
cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
@ -26,6 +29,13 @@ url = { version = "^2.1", features = ["serde"] }
snafu = "^0.6"
bitflags = "^1.2"
# actix optional deps
actix-web = { version = "^3", default-features = false, optional = true }
async-trait = { version = "^0.1.33", optional = true }
lazy_static = { version = "1.4.0", optional = true }
bytes = { version = "^1.0", optional = true }
futures = { version = "^0.3", optional = true }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"
uuid = { version = "^0.8", features = ["v4"] }
@ -40,13 +50,19 @@ claim = "0.3.1"
version-sync = "^0.9"
serde_yaml = "0.8"
# actix dev-deps
actix-rt = { version = "^1" }
url = { version = "^2.1", features = ["serde"] }
serde_json = { version = "^1.0" }
chrono = { version = "^0.4", features = ["serde"] }
[workspace]
members = [
".",
"cloudevents-sdk-actix-web",
"cloudevents-sdk-reqwest",
"cloudevents-sdk-rdkafka",
"cloudevents-sdk-warp"
"cloudevents-sdk-warp",
]
exclude = [
"example-projects/actix-web-example",

View File

@ -17,7 +17,7 @@ cloudevents-sdk = { version = "0.3.0", path = ".." }
actix-web = { version = "^3", default-features = false }
async-trait = "^0.1.33"
lazy_static = "1.4.0"
bytes = "^0.5"
bytes = "^1.0"
futures = "^0.3"
[dev-dependencies]

View File

@ -14,7 +14,7 @@ categories = ["web-programming", "encoding"]
[dependencies]
bytes = "^1.0"
cloudevents-sdk = { version = "0.3.0", path = ".." }
cloudevents-sdk = { path = ".." }
lazy_static = "1.4.0"
rdkafka = { version = "^0.25", features = ["cmake-build"] }
@ -22,5 +22,5 @@ rdkafka = { version = "^0.25", features = ["cmake-build"] }
url = { version = "^2.1" }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
futures = "0.3.5"
futures = "^0.3"
version-sync = "^0.9"

View File

@ -14,7 +14,7 @@ categories = ["web-programming", "encoding", "web-programming::http-client"]
[dependencies]
async-trait = "^0.1.33"
cloudevents-sdk = { version = "0.3.0", path = ".." }
cloudevents-sdk = { path = ".." }
lazy_static = "1.4.0"
bytes = "^1.0"

View File

@ -5,8 +5,7 @@ authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
edition = "2018"
[dependencies]
cloudevents-sdk = { path = "../.." }
cloudevents-sdk-actix-web = { path = "../../cloudevents-sdk-actix-web" }
cloudevents-sdk = { path = "../..", features = ["cloudevents-actix"] }
actix-web = "^3"
actix-cors = "^0.5"
lazy_static = "1.4.0"

View File

@ -1,6 +1,6 @@
use actix_web::{get, post, web, App, HttpRequest, HttpResponse, HttpServer};
use cloudevents::actix::{HttpRequestExt, HttpResponseBuilderExt};
use cloudevents::{EventBuilder, EventBuilderV10};
use cloudevents_sdk_actix_web::{HttpResponseBuilderExt, HttpRequestExt};
use serde_json::json;
#[post("/")]

68
src/actix/headers.rs Normal file
View File

@ -0,0 +1,68 @@
use crate::event::SpecVersion;
use actix_web::http::header;
use actix_web::http::{HeaderName, HeaderValue};
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::str::FromStr;
macro_rules! unwrap_optional_header {
($headers:expr, $name:expr) => {
$headers
.get::<&'static HeaderName>(&$name)
.map(|a| header_value_to_str!(a))
};
}
macro_rules! header_value_to_str {
($header_value:expr) => {
$header_value
.to_str()
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_to_header_value {
($header_value:expr) => {
HeaderValue::from_str($header_value).map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_name_to_header {
($attribute:expr) => {
HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! attribute_name_to_header {
($attribute:expr) => {
str_name_to_header!(&["ce-", $attribute].concat())
};
}
fn attributes_to_headers(
it: impl Iterator<Item = &'static str>,
) -> HashMap<&'static str, HeaderName> {
it.map(|s| {
if s == "datacontenttype" {
(s, header::CONTENT_TYPE)
} else {
(s, attribute_name_to_header!(s).unwrap())
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
attributes_to_headers(SpecVersion::all_attribute_names());
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
HeaderName::from_static("ce-specversion");
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
HeaderValue::from_static("application/cloudevents+json");
}

57
src/actix/mod.rs Normal file
View File

@ -0,0 +1,57 @@
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Actix web](https://docs.rs/actix-web/) to easily send and receive CloudEvents.
//!
//! To deserialize an HTTP request as CloudEvent:
//!
//! ```
//! use cloudevents::actix::HttpRequestExt;
//! use actix_web::{HttpRequest, web, post};
//!
//! #[post("/")]
//! async fn post_event(req: HttpRequest, payload: web::Payload) -> Result<String, actix_web::Error> {
//! let event = req.to_event(payload).await?;
//! println!("Received Event: {:?}", event);
//! Ok(format!("{:?}", event))
//! }
//! ```
//!
//! To serialize a CloudEvent to an HTTP response:
//!
//! ```
//! use cloudevents::actix::HttpResponseBuilderExt;
//! use actix_web::{HttpRequest, web, get, HttpResponse};
//! use cloudevents::{EventBuilderV10, EventBuilder};
//! use serde_json::json;
//!
//! #[get("/")]
//! async fn get_event() -> Result<HttpResponse, actix_web::Error> {
//! Ok(HttpResponse::Ok()
//! .event(
//! EventBuilderV10::new()
//! .id("0001")
//! .ty("example.test")
//! .source("http://localhost/")
//! .data("application/json", json!({"hello": "world"}))
//! .build()
//! .expect("No error while building the event"),
//! )
//! .await?
//! )
//! }
//! ```
//!
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.3.1")]
#![deny(broken_intra_doc_links)]
#[macro_use]
mod headers;
mod server_request;
mod server_response;
pub use server_request::request_to_event;
pub use server_request::HttpRequestDeserializer;
pub use server_request::HttpRequestExt;
pub use server_response::event_to_response;
pub use server_response::HttpResponseBuilderExt;
pub use server_response::HttpResponseSerializer;

210
src/actix/server_request.rs Normal file
View File

@ -0,0 +1,210 @@
use super::headers;
use actix_web::http::HeaderName;
use actix_web::web::{Bytes, BytesMut};
use actix_web::{web, HttpMessage, HttpRequest};
use async_trait::async_trait;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use crate::{message, Event};
use futures::StreamExt;
use std::convert::TryFrom;
/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait.
pub struct HttpRequestDeserializer<'a> {
req: &'a HttpRequest,
body: Bytes,
}
impl HttpRequestDeserializer<'_> {
pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer {
HttpRequestDeserializer { req, body }
}
}
impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
for (hn, hv) in
self.req.headers().iter().filter(|(hn, _)| {
headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
})
{
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.req.headers().get("content-type") {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if !self.body.is_empty() {
visitor.end_with_data(self.body.to_vec())
} else {
visitor.end()
}
}
}
impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body.to_vec())
}
}
impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
fn encoding(&self) -> Encoding {
if self.req.content_type() == "application/cloudevents+json" {
Encoding::STRUCTURED
} else if self
.req
.headers()
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
.is_some()
{
Encoding::BINARY
} else {
Encoding::UNKNOWN
}
}
}
/// Method to transform an incoming [`HttpRequest`] to [`Event`].
pub async fn request_to_event(
req: &HttpRequest,
mut payload: web::Payload,
) -> std::result::Result<Event, actix_web::error::Error> {
let mut bytes = BytesMut::new();
while let Some(item) = payload.next().await {
bytes.extend_from_slice(&item?);
}
MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze()))
.map_err(actix_web::error::ErrorBadRequest)
}
/// Extention Trait for [`HttpRequest`] which acts as a wrapper for the function [`request_to_event()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
#[async_trait(?Send)]
pub trait HttpRequestExt: private::Sealed {
/// Convert this [`HttpRequest`] into an [`Event`].
async fn to_event(
&self,
mut payload: web::Payload,
) -> std::result::Result<Event, actix_web::error::Error>;
}
#[async_trait(?Send)]
impl HttpRequestExt for HttpRequest {
async fn to_event(
&self,
payload: web::Payload,
) -> std::result::Result<Event, actix_web::error::Error> {
request_to_event(self, payload).await
}
}
mod private {
// Sealing the RequestExt
pub trait Sealed {}
impl Sealed for actix_web::HttpRequest {}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::test;
use chrono::Utc;
use crate::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[actix_rt::test]
async fn test_request() {
let time = Utc::now();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.extension("someint", "10")
.build()
.unwrap();
let (req, payload) = test::TestRequest::post()
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.to_http_parts();
let resp = req.to_event(web::Payload(payload)).await.unwrap();
assert_eq!(expected, resp);
}
#[actix_rt::test]
async fn test_request_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let (req, payload) = test::TestRequest::post()
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.set_json(&j)
.to_http_parts();
let resp = req.to_event(web::Payload(payload)).await.unwrap();
assert_eq!(expected, resp);
}
}

View File

@ -0,0 +1,214 @@
use super::headers;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use actix_web::dev::HttpResponseBuilder;
use actix_web::http::{HeaderName, HeaderValue};
use actix_web::HttpResponse;
use async_trait::async_trait;
use std::str::FromStr;
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
pub struct HttpResponseSerializer {
builder: HttpResponseBuilder,
}
impl HttpResponseSerializer {
pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer {
HttpResponseSerializer { builder }
}
}
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.builder.set_header(
headers::SPEC_VERSION_HEADER.clone(),
str_to_header_value!(spec_version.as_str())?,
);
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder.set_header(
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
str_to_header_value!(value.to_string().as_str())?,
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder.set_header(
attribute_name_to_header!(name)?,
str_to_header_value!(value.to_string().as_str())?,
);
Ok(self)
}
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
Ok(self.builder.body(bytes))
}
fn end(mut self) -> Result<HttpResponse> {
Ok(self.builder.finish())
}
}
impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
Ok(self
.builder
.set_header(
actix_web::http::header::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER.clone(),
)
.body(bytes))
}
}
/// Method to fill an [`HttpResponseBuilder`] with an [`Event`].
pub async fn event_to_response(
event: Event,
response: HttpResponseBuilder,
) -> std::result::Result<HttpResponse, actix_web::error::Error> {
BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response))
.map_err(actix_web::error::ErrorBadRequest)
}
/// Extension Trait for [`HttpResponseBuilder`] which acts as a wrapper for the function [`event_to_response()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
#[async_trait(?Send)]
pub trait HttpResponseBuilderExt: private::Sealed {
/// Fill this [`HttpResponseBuilder`] with an [`Event`].
async fn event(
self,
event: Event,
) -> std::result::Result<HttpResponse, actix_web::error::Error>;
}
#[async_trait(?Send)]
impl HttpResponseBuilderExt for HttpResponseBuilder {
async fn event(
self,
event: Event,
) -> std::result::Result<HttpResponse, actix_web::error::Error> {
event_to_response(event, self).await
}
}
// Sealing the HttpResponseBuilderExt
mod private {
pub trait Sealed {}
impl Sealed for actix_web::dev::HttpResponseBuilder {}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{EventBuilder, EventBuilderV10};
use actix_web::http::StatusCode;
use actix_web::test;
use futures::TryStreamExt;
use serde_json::json;
#[actix_rt::test]
async fn test_response() {
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let resp = HttpResponseBuilder::new(StatusCode::OK)
.event(input)
.await
.unwrap();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
}
#[actix_rt::test]
async fn test_response_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let mut resp = HttpResponseBuilder::new(StatusCode::OK)
.event(input)
.await
.unwrap();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let bytes = test::load_stream(resp.take_body().into_stream())
.await
.unwrap();
assert_eq!(j.to_string().as_bytes(), bytes.as_ref())
}
}

View File

@ -40,6 +40,9 @@
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.3.1")]
#![deny(broken_intra_doc_links)]
#[cfg(feature = "cloudevents-actix")]
pub mod actix;
pub mod event;
pub mod message;