New feature: cloudevents-reqwest

Conditionally compile reqwest module when enabled

This resulted in a naming conflict between my desired feature name,
"reqwest", and the optional dependency itself. So I adopted the
convention of prefixing the features with "cloudevents-".

Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
Jim Crossley 2021-06-17 18:55:42 -04:00
parent 935234a9cb
commit 51b49f1335
11 changed files with 585 additions and 13 deletions

View File

@ -91,14 +91,13 @@ jobs:
CC: musl-gcc
CXX: g++
# If wasm, then we test only the main module and cloudevents-sdk-reqwest
- uses: actions-rs/cargo@v1
name: "Build"
if: matrix.target == 'wasm32-unknown-unknown'
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target wasm32-unknown-unknown --package cloudevents-sdk --package cloudevents-sdk-reqwest
args: --target wasm32-unknown-unknown --package cloudevents-sdk
# Build examples
- uses: actions-rs/cargo@v1

View File

@ -18,6 +18,7 @@ name = "cloudevents"
[features]
cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
@ -29,8 +30,9 @@ url = { version = "^2.1", features = ["serde"] }
snafu = "^0.6"
bitflags = "^1.2"
# actix optional deps
# runtime optional deps
actix-web = { version = "^3", default-features = false, optional = true }
reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true }
async-trait = { version = "^0.1.33", optional = true }
lazy_static = { version = "1.4.0", optional = true }
bytes = { version = "^1.0", optional = true }
@ -50,11 +52,13 @@ claim = "0.3.1"
version-sync = "^0.9"
serde_yaml = "0.8"
# actix dev-deps
# runtime dev-deps
actix-rt = { version = "^1" }
url = { version = "^2.1", features = ["serde"] }
serde_json = { version = "^1.0" }
chrono = { version = "^0.4", features = ["serde"] }
mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
[workspace]
members = [

View File

@ -14,7 +14,7 @@ categories = ["web-programming", "encoding", "web-programming::http-client"]
[dependencies]
async-trait = "^0.1.33"
cloudevents-sdk = { path = ".." }
cloudevents-sdk = { version = "0.3.0", path = ".." }
lazy_static = "1.4.0"
bytes = "^1.0"

View File

@ -11,8 +11,7 @@ crate-type = ["cdylib"]
[dependencies]
reqwest = "^0.11"
cloudevents-sdk = { path = "../.." }
cloudevents-sdk-reqwest = { path = "../../cloudevents-sdk-reqwest" }
cloudevents-sdk = { path = "../..", features = ["cloudevents-reqwest"] }
url = { version = "^2.1" }
web-sys = { version = "0.3.39", features = ["Window", "Location"] }
wasm-bindgen-futures = "0.4.12"

View File

@ -1,5 +1,5 @@
use cloudevents::reqwest::RequestBuilderExt;
use cloudevents::{EventBuilder, EventBuilderV10};
use cloudevents_sdk_reqwest::RequestBuilderExt;
use wasm_bindgen::prelude::*;
#[wasm_bindgen]

View File

@ -1,14 +1,14 @@
use super::headers;
use actix_web::http::HeaderName;
use actix_web::web::{Bytes, BytesMut};
use actix_web::{web, HttpMessage, HttpRequest};
use async_trait::async_trait;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use crate::{message, Event};
use actix_web::http::HeaderName;
use actix_web::web::{Bytes, BytesMut};
use actix_web::{web, HttpMessage, HttpRequest};
use async_trait::async_trait;
use futures::StreamExt;
use std::convert::TryFrom;
@ -145,8 +145,8 @@ mod tests {
use super::*;
use actix_web::test;
use chrono::Utc;
use crate::{EventBuilder, EventBuilderV10};
use chrono::Utc;
use serde_json::json;
#[actix_rt::test]

View File

@ -42,6 +42,8 @@
#[cfg(feature = "cloudevents-actix")]
pub mod actix;
#[cfg(feature = "cloudevents-reqwest")]
pub mod reqwest;
pub mod event;
pub mod message;

View File

@ -0,0 +1,199 @@
use super::headers;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use reqwest::RequestBuilder;
use std::str::FromStr;
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
pub struct RequestSerializer {
req: RequestBuilder,
}
impl RequestSerializer {
pub fn new(req: RequestBuilder) -> RequestSerializer {
RequestSerializer { req }
}
}
impl BinarySerializer<RequestBuilder> for RequestSerializer {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.req = self
.req
.header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str());
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.req = self.req.header(
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
value.to_string(),
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.req = self
.req
.header(attribute_name_to_header!(name)?, value.to_string());
Ok(self)
}
fn end_with_data(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
Ok(self.req.body(bytes))
}
fn end(self) -> Result<RequestBuilder> {
Ok(self.req)
}
}
impl StructuredSerializer<RequestBuilder> for RequestSerializer {
fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
Ok(self
.req
.header(
reqwest::header::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER.clone(),
)
.body(bytes))
}
}
/// Method to fill a [`RequestBuilder`] with an [`Event`].
pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result<RequestBuilder> {
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
}
/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait RequestBuilderExt: private::Sealed {
/// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
fn event(self, event: Event) -> Result<RequestBuilder>;
}
impl RequestBuilderExt for RequestBuilder {
fn event(self, event: Event) -> Result<RequestBuilder> {
event_to_request(event, self)
}
}
// Sealing the RequestBuilderExt
mod private {
pub trait Sealed {}
impl Sealed for reqwest::RequestBuilder {}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::{mock, Matcher};
use crate::message::StructuredDeserializer;
use crate::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[tokio::test]
async fn test_request() {
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("ce-specversion", "1.0")
.match_header("ce-id", "0001")
.match_header("ce-type", "example.test")
.match_header("ce-source", "http://localhost/")
.match_header("ce-someint", "10")
.match_body(Matcher::Missing)
.create();
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
client
.post(&url)
.event(input)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
#[tokio::test]
async fn test_request_with_full_data() {
let j = json!({"hello": "world"});
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("ce-specversion", "1.0")
.match_header("ce-id", "0001")
.match_header("ce-type", "example.test")
.match_header("ce-source", "http://localhost/")
.match_header("content-type", "application/json")
.match_header("ce-someint", "10")
.match_body(Matcher::Exact(j.to_string()))
.create();
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
client
.post(&url)
.event(input)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
#[tokio::test]
async fn test_structured_request_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("content-type", "application/cloudevents+json")
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
.create();
let client = reqwest::Client::new();
StructuredDeserializer::deserialize_structured(
input,
RequestSerializer::new(client.post(&url)),
)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
}

View File

@ -0,0 +1,263 @@
use super::headers;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
};
use crate::{message, Event};
use async_trait::async_trait;
use bytes::Bytes;
use reqwest::header::{HeaderMap, HeaderName};
use reqwest::Response;
use std::convert::TryFrom;
/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait.
pub struct ResponseDeserializer {
headers: HeaderMap,
body: Bytes,
}
impl ResponseDeserializer {
pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer {
ResponseDeserializer { headers, body }
}
}
impl BinaryDeserializer for ResponseDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
for (hn, hv) in self
.headers
.iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
{
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.headers.get("content-type") {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if !self.body.is_empty() {
visitor.end_with_data(self.body.to_vec())
} else {
visitor.end()
}
}
}
impl StructuredDeserializer for ResponseDeserializer {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body.to_vec())
}
}
impl MessageDeserializer for ResponseDeserializer {
fn encoding(&self) -> Encoding {
match (
#[allow(clippy::borrow_interior_mutable_const)]
unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE)
.map(|r| r.ok())
.flatten()
.map(|e| e.starts_with("application/cloudevents+json")),
self.headers
.get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER),
) {
(Some(true), _) => Encoding::STRUCTURED,
(_, Some(_)) => Encoding::BINARY,
_ => Encoding::UNKNOWN,
}
}
}
/// Method to transform an incoming [`Response`] to [`Event`].
pub async fn response_to_event(res: Response) -> Result<Event> {
let h = res.headers().to_owned();
let b = res.bytes().await.map_err(|e| Error::Other {
source: Box::new(e),
})?;
MessageDeserializer::into_event(ResponseDeserializer::new(h, b))
}
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
#[async_trait(?Send)]
pub trait ResponseExt: private::Sealed {
/// Convert this [`Response`] to [`Event`].
async fn into_event(self) -> Result<Event>;
}
#[async_trait(?Send)]
impl ResponseExt for Response {
async fn into_event(self) -> Result<Event> {
response_to_event(self).await
}
}
// Sealing the ResponseExt
mod private {
pub trait Sealed {}
impl Sealed for reqwest::Response {}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::mock;
use crate::{EventBuilder, EventBuilderV10};
use chrono::Utc;
use serde_json::json;
#[tokio::test]
async fn test_response() {
let time = Utc::now();
let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header("ce-specversion", "1.0")
.with_header("ce-id", "0001")
.with_header("ce-type", "example.test")
.with_header("ce-source", "http://localhost")
.with_header("ce-someint", "10")
.with_header("ce-time", &time.to_rfc3339())
.create();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.source("http://localhost")
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_event()
.await
.unwrap();
assert_eq!(expected, res);
}
#[tokio::test]
async fn test_response_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header("ce-specversion", "1.0")
.with_header("ce-id", "0001")
.with_header("ce-type", "example.test")
.with_header("ce-source", "http://localhost/")
.with_header("content-type", "application/json")
.with_header("ce-someint", "10")
.with_header("ce-time", &time.to_rfc3339())
.with_body(j.to_string())
.create();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.source("http://localhost/")
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_event()
.await
.unwrap();
assert_eq!(expected, res);
}
#[tokio::test]
async fn test_structured_response_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header(
"content-type",
"application/cloudevents+json; charset=utf-8",
)
.with_body(serde_json::to_string(&expected).unwrap())
.create();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_event()
.await
.unwrap();
assert_eq!(expected, res);
}
}

61
src/reqwest/headers.rs Normal file
View File

@ -0,0 +1,61 @@
use crate::event::SpecVersion;
use lazy_static::lazy_static;
use reqwest::header::{HeaderName, HeaderValue};
use std::collections::HashMap;
use std::str::FromStr;
macro_rules! unwrap_optional_header {
($headers:expr, $name:expr) => {
$headers
.get::<&'static reqwest::header::HeaderName>(&$name)
.map(|a| header_value_to_str!(a))
};
}
macro_rules! header_value_to_str {
($header_value:expr) => {
$header_value
.to_str()
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_name_to_header {
($attribute:expr) => {
reqwest::header::HeaderName::from_str($attribute).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
}
})
};
}
macro_rules! attribute_name_to_header {
($attribute:expr) => {
str_name_to_header!(&["ce-", $attribute].concat())
};
}
fn attributes_to_headers(
it: impl Iterator<Item = &'static str>,
) -> HashMap<&'static str, HeaderName> {
it.map(|s| {
if s == "datacontenttype" {
(s, reqwest::header::CONTENT_TYPE)
} else {
(s, attribute_name_to_header!(s).unwrap())
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
attributes_to_headers(SpecVersion::all_attribute_names());
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
HeaderName::from_static("ce-specversion");
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
HeaderValue::from_static("application/cloudevents+json");
}

45
src/reqwest/mod.rs Normal file
View File

@ -0,0 +1,45 @@
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [reqwest](https://docs.rs/reqwest/) to easily send and receive CloudEvents.
//!
//! ```
//! use cloudevents::reqwest::{RequestBuilderExt, ResponseExt};
//! use cloudevents::{EventBuilderV10, EventBuilder};
//! use serde_json::json;
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! let client = reqwest::Client::new();
//!
//! // Prepare the event to send
//! let event_to_send = EventBuilderV10::new()
//! .id("0001")
//! .ty("example.test")
//! .source("http://localhost/")
//! .data("application/json", json!({"hello": "world"}))
//! .build()?;
//!
//! // Send request
//! let response = client.post("http://localhost")
//! .event(event_to_send)?
//! .send().await?;
//! // Parse response as event
//! let received_event = response
//! .into_event().await?;
//! # Ok(())
//! # }
//! ```
//!
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`].
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.3.1")]
#![deny(broken_intra_doc_links)]
#[macro_use]
mod headers;
mod client_request;
mod client_response;
pub use client_request::event_to_request;
pub use client_request::RequestBuilderExt;
pub use client_request::RequestSerializer;
pub use client_response::response_to_event;
pub use client_response::ResponseDeserializer;
pub use client_response::ResponseExt;