From 51b49f13356b7597dba65cd031a47157d3433494 Mon Sep 17 00:00:00 2001 From: Jim Crossley Date: Thu, 17 Jun 2021 18:55:42 -0400 Subject: [PATCH] New feature: cloudevents-reqwest Conditionally compile reqwest module when enabled This resulted in a naming conflict between my desired feature name, "reqwest", and the optional dependency itself. So I adopted the convention of prefixing the features with "cloudevents-". Signed-off-by: Jim Crossley --- .github/workflows/rust_tests.yml | 3 +- Cargo.toml | 8 +- cloudevents-sdk-reqwest/Cargo.toml | 2 +- .../reqwest-wasm-example/Cargo.toml | 3 +- .../reqwest-wasm-example/src/lib.rs | 2 +- src/actix/server_request.rs | 10 +- src/lib.rs | 2 + src/reqwest/client_request.rs | 199 +++++++++++++ src/reqwest/client_response.rs | 263 ++++++++++++++++++ src/reqwest/headers.rs | 61 ++++ src/reqwest/mod.rs | 45 +++ 11 files changed, 585 insertions(+), 13 deletions(-) create mode 100644 src/reqwest/client_request.rs create mode 100644 src/reqwest/client_response.rs create mode 100644 src/reqwest/headers.rs create mode 100644 src/reqwest/mod.rs diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index d0ffe98..2b91b47 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -91,14 +91,13 @@ jobs: CC: musl-gcc CXX: g++ - # If wasm, then we test only the main module and cloudevents-sdk-reqwest - uses: actions-rs/cargo@v1 name: "Build" if: matrix.target == 'wasm32-unknown-unknown' with: command: build toolchain: ${{ matrix.toolchain }} - args: --target wasm32-unknown-unknown --package cloudevents-sdk --package cloudevents-sdk-reqwest + args: --target wasm32-unknown-unknown --package cloudevents-sdk # Build examples - uses: actions-rs/cargo@v1 diff --git a/Cargo.toml b/Cargo.toml index 109ef00..3787601 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ name = "cloudevents" [features] cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"] +cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -29,8 +30,9 @@ url = { version = "^2.1", features = ["serde"] } snafu = "^0.6" bitflags = "^1.2" -# actix optional deps +# runtime optional deps actix-web = { version = "^3", default-features = false, optional = true } +reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true } async-trait = { version = "^0.1.33", optional = true } lazy_static = { version = "1.4.0", optional = true } bytes = { version = "^1.0", optional = true } @@ -50,11 +52,13 @@ claim = "0.3.1" version-sync = "^0.9" serde_yaml = "0.8" -# actix dev-deps +# runtime dev-deps actix-rt = { version = "^1" } url = { version = "^2.1", features = ["serde"] } serde_json = { version = "^1.0" } chrono = { version = "^0.4", features = ["serde"] } +mockito = "0.25.1" +tokio = { version = "^1.0", features = ["full"] } [workspace] members = [ diff --git a/cloudevents-sdk-reqwest/Cargo.toml b/cloudevents-sdk-reqwest/Cargo.toml index 7aaca04..7f3d626 100644 --- a/cloudevents-sdk-reqwest/Cargo.toml +++ b/cloudevents-sdk-reqwest/Cargo.toml @@ -14,7 +14,7 @@ categories = ["web-programming", "encoding", "web-programming::http-client"] [dependencies] async-trait = "^0.1.33" -cloudevents-sdk = { path = ".." } +cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" bytes = "^1.0" diff --git a/example-projects/reqwest-wasm-example/Cargo.toml b/example-projects/reqwest-wasm-example/Cargo.toml index f3c635a..85dd1bf 100644 --- a/example-projects/reqwest-wasm-example/Cargo.toml +++ b/example-projects/reqwest-wasm-example/Cargo.toml @@ -11,8 +11,7 @@ crate-type = ["cdylib"] [dependencies] reqwest = "^0.11" -cloudevents-sdk = { path = "../.." } -cloudevents-sdk-reqwest = { path = "../../cloudevents-sdk-reqwest" } +cloudevents-sdk = { path = "../..", features = ["cloudevents-reqwest"] } url = { version = "^2.1" } web-sys = { version = "0.3.39", features = ["Window", "Location"] } wasm-bindgen-futures = "0.4.12" diff --git a/example-projects/reqwest-wasm-example/src/lib.rs b/example-projects/reqwest-wasm-example/src/lib.rs index 2efe584..8d9e6fb 100644 --- a/example-projects/reqwest-wasm-example/src/lib.rs +++ b/example-projects/reqwest-wasm-example/src/lib.rs @@ -1,5 +1,5 @@ +use cloudevents::reqwest::RequestBuilderExt; use cloudevents::{EventBuilder, EventBuilderV10}; -use cloudevents_sdk_reqwest::RequestBuilderExt; use wasm_bindgen::prelude::*; #[wasm_bindgen] diff --git a/src/actix/server_request.rs b/src/actix/server_request.rs index 92a1ddc..8a0e3c2 100644 --- a/src/actix/server_request.rs +++ b/src/actix/server_request.rs @@ -1,14 +1,14 @@ use super::headers; -use actix_web::http::HeaderName; -use actix_web::web::{Bytes, BytesMut}; -use actix_web::{web, HttpMessage, HttpRequest}; -use async_trait::async_trait; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, }; use crate::{message, Event}; +use actix_web::http::HeaderName; +use actix_web::web::{Bytes, BytesMut}; +use actix_web::{web, HttpMessage, HttpRequest}; +use async_trait::async_trait; use futures::StreamExt; use std::convert::TryFrom; @@ -145,8 +145,8 @@ mod tests { use super::*; use actix_web::test; - use chrono::Utc; use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; use serde_json::json; #[actix_rt::test] diff --git a/src/lib.rs b/src/lib.rs index 4a528b8..f02ed01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,8 @@ #[cfg(feature = "cloudevents-actix")] pub mod actix; +#[cfg(feature = "cloudevents-reqwest")] +pub mod reqwest; pub mod event; pub mod message; diff --git a/src/reqwest/client_request.rs b/src/reqwest/client_request.rs new file mode 100644 index 0000000..64b3703 --- /dev/null +++ b/src/reqwest/client_request.rs @@ -0,0 +1,199 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, +}; +use crate::Event; +use reqwest::RequestBuilder; +use std::str::FromStr; + +/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits. +pub struct RequestSerializer { + req: RequestBuilder, +} + +impl RequestSerializer { + pub fn new(req: RequestBuilder) -> RequestSerializer { + RequestSerializer { req } + } +} + +impl BinarySerializer for RequestSerializer { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + self.req = self + .req + .header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str()); + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.req = self.req.header( + headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), + value.to_string(), + ); + Ok(self) + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.req = self + .req + .header(attribute_name_to_header!(name)?, value.to_string()); + Ok(self) + } + + fn end_with_data(self, bytes: Vec) -> Result { + Ok(self.req.body(bytes)) + } + + fn end(self) -> Result { + Ok(self.req) + } +} + +impl StructuredSerializer for RequestSerializer { + fn set_structured_event(self, bytes: Vec) -> Result { + Ok(self + .req + .header( + reqwest::header::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER.clone(), + ) + .body(bytes)) + } +} + +/// Method to fill a [`RequestBuilder`] with an [`Event`]. +pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result { + BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder)) +} + +/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +pub trait RequestBuilderExt: private::Sealed { + /// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`]. + fn event(self, event: Event) -> Result; +} + +impl RequestBuilderExt for RequestBuilder { + fn event(self, event: Event) -> Result { + event_to_request(event, self) + } +} + +// Sealing the RequestBuilderExt +mod private { + pub trait Sealed {} + impl Sealed for reqwest::RequestBuilder {} +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::{mock, Matcher}; + + use crate::message::StructuredDeserializer; + use crate::{EventBuilder, EventBuilderV10}; + use serde_json::json; + + #[tokio::test] + async fn test_request() { + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("ce-specversion", "1.0") + .match_header("ce-id", "0001") + .match_header("ce-type", "example.test") + .match_header("ce-source", "http://localhost/") + .match_header("ce-someint", "10") + .match_body(Matcher::Missing) + .create(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + client + .post(&url) + .event(input) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } + + #[tokio::test] + async fn test_request_with_full_data() { + let j = json!({"hello": "world"}); + + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("ce-specversion", "1.0") + .match_header("ce-id", "0001") + .match_header("ce-type", "example.test") + .match_header("ce-source", "http://localhost/") + .match_header("content-type", "application/json") + .match_header("ce-someint", "10") + .match_body(Matcher::Exact(j.to_string())) + .create(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + + client + .post(&url) + .event(input) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } + + #[tokio::test] + async fn test_structured_request_with_full_data() { + let j = json!({"hello": "world"}); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("content-type", "application/cloudevents+json") + .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap())) + .create(); + + let client = reqwest::Client::new(); + StructuredDeserializer::deserialize_structured( + input, + RequestSerializer::new(client.post(&url)), + ) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } +} diff --git a/src/reqwest/client_response.rs b/src/reqwest/client_response.rs new file mode 100644 index 0000000..7aa2ab9 --- /dev/null +++ b/src/reqwest/client_response.rs @@ -0,0 +1,263 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, + MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, +}; +use crate::{message, Event}; +use async_trait::async_trait; +use bytes::Bytes; +use reqwest::header::{HeaderMap, HeaderName}; +use reqwest::Response; +use std::convert::TryFrom; + +/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait. +pub struct ResponseDeserializer { + headers: HeaderMap, + body: Bytes, +} + +impl ResponseDeserializer { + pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer { + ResponseDeserializer { headers, body } + } +} + +impl BinaryDeserializer for ResponseDeserializer { + fn deserialize_binary>(self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + for (hn, hv) in self + .headers + .iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) + { + let name = &hn.as_str()["ce-".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + } + + if let Some(hv) = self.headers.get("content-type") { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + + if !self.body.is_empty() { + visitor.end_with_data(self.body.to_vec()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for ResponseDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + if self.encoding() != Encoding::STRUCTURED { + return Err(message::Error::WrongEncoding {}); + } + visitor.set_structured_event(self.body.to_vec()) + } +} + +impl MessageDeserializer for ResponseDeserializer { + fn encoding(&self) -> Encoding { + match ( + #[allow(clippy::borrow_interior_mutable_const)] + unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE) + .map(|r| r.ok()) + .flatten() + .map(|e| e.starts_with("application/cloudevents+json")), + self.headers + .get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER), + ) { + (Some(true), _) => Encoding::STRUCTURED, + (_, Some(_)) => Encoding::BINARY, + _ => Encoding::UNKNOWN, + } + } +} + +/// Method to transform an incoming [`Response`] to [`Event`]. +pub async fn response_to_event(res: Response) -> Result { + let h = res.headers().to_owned(); + let b = res.bytes().await.map_err(|e| Error::Other { + source: Box::new(e), + })?; + + MessageDeserializer::into_event(ResponseDeserializer::new(h, b)) +} + +/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +#[async_trait(?Send)] +pub trait ResponseExt: private::Sealed { + /// Convert this [`Response`] to [`Event`]. + async fn into_event(self) -> Result; +} + +#[async_trait(?Send)] +impl ResponseExt for Response { + async fn into_event(self) -> Result { + response_to_event(self).await + } +} + +// Sealing the ResponseExt +mod private { + pub trait Sealed {} + impl Sealed for reqwest::Response {} +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::mock; + + use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; + use serde_json::json; + + #[tokio::test] + async fn test_response() { + let time = Utc::now(); + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header("ce-specversion", "1.0") + .with_header("ce-id", "0001") + .with_header("ce-type", "example.test") + .with_header("ce-source", "http://localhost") + .with_header("ce-someint", "10") + .with_header("ce-time", &time.to_rfc3339()) + .create(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .source("http://localhost") + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_event() + .await + .unwrap(); + + assert_eq!(expected, res); + } + + #[tokio::test] + async fn test_response_with_full_data() { + let time = Utc::now(); + let j = json!({"hello": "world"}); + + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header("ce-specversion", "1.0") + .with_header("ce-id", "0001") + .with_header("ce-type", "example.test") + .with_header("ce-source", "http://localhost/") + .with_header("content-type", "application/json") + .with_header("ce-someint", "10") + .with_header("ce-time", &time.to_rfc3339()) + .with_body(j.to_string()) + .create(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .source("http://localhost/") + .data("application/json", j.to_string().into_bytes()) + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_event() + .await + .unwrap(); + + assert_eq!(expected, res); + } + + #[tokio::test] + async fn test_structured_response_with_full_data() { + let time = Utc::now(); + + let j = json!({"hello": "world"}); + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header( + "content-type", + "application/cloudevents+json; charset=utf-8", + ) + .with_body(serde_json::to_string(&expected).unwrap()) + .create(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_event() + .await + .unwrap(); + + assert_eq!(expected, res); + } +} diff --git a/src/reqwest/headers.rs b/src/reqwest/headers.rs new file mode 100644 index 0000000..4cb98d6 --- /dev/null +++ b/src/reqwest/headers.rs @@ -0,0 +1,61 @@ +use crate::event::SpecVersion; +use lazy_static::lazy_static; +use reqwest::header::{HeaderName, HeaderValue}; +use std::collections::HashMap; +use std::str::FromStr; + +macro_rules! unwrap_optional_header { + ($headers:expr, $name:expr) => { + $headers + .get::<&'static reqwest::header::HeaderName>(&$name) + .map(|a| header_value_to_str!(a)) + }; +} + +macro_rules! header_value_to_str { + ($header_value:expr) => { + $header_value + .to_str() + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! str_name_to_header { + ($attribute:expr) => { + reqwest::header::HeaderName::from_str($attribute).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + }) + }; +} + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + str_name_to_header!(&["ce-", $attribute].concat()) + }; +} + +fn attributes_to_headers( + it: impl Iterator, +) -> HashMap<&'static str, HeaderName> { + it.map(|s| { + if s == "datacontenttype" { + (s, reqwest::header::CONTENT_TYPE) + } else { + (s, attribute_name_to_header!(s).unwrap()) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = + attributes_to_headers(SpecVersion::all_attribute_names()); + pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = + HeaderName::from_static("ce-specversion"); + pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = + HeaderValue::from_static("application/cloudevents+json"); +} diff --git a/src/reqwest/mod.rs b/src/reqwest/mod.rs new file mode 100644 index 0000000..edb25ed --- /dev/null +++ b/src/reqwest/mod.rs @@ -0,0 +1,45 @@ +//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [reqwest](https://docs.rs/reqwest/) to easily send and receive CloudEvents. +//! +//! ``` +//! use cloudevents::reqwest::{RequestBuilderExt, ResponseExt}; +//! use cloudevents::{EventBuilderV10, EventBuilder}; +//! use serde_json::json; +//! +//! # async fn example() -> Result<(), Box> { +//! let client = reqwest::Client::new(); +//! +//! // Prepare the event to send +//! let event_to_send = EventBuilderV10::new() +//! .id("0001") +//! .ty("example.test") +//! .source("http://localhost/") +//! .data("application/json", json!({"hello": "world"})) +//! .build()?; +//! +//! // Send request +//! let response = client.post("http://localhost") +//! .event(event_to_send)? +//! .send().await?; +//! // Parse response as event +//! let received_event = response +//! .into_event().await?; +//! # Ok(()) +//! # } +//! ``` +//! +//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]. + +#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.3.1")] +#![deny(broken_intra_doc_links)] + +#[macro_use] +mod headers; +mod client_request; +mod client_response; + +pub use client_request::event_to_request; +pub use client_request::RequestBuilderExt; +pub use client_request::RequestSerializer; +pub use client_response::response_to_event; +pub use client_response::ResponseDeserializer; +pub use client_response::ResponseExt;