diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index aa97c70..8b3b3dd 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -60,14 +60,14 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace --all-features + args: --target ${{ matrix.target }} --all-features - uses: actions-rs/cargo@v1 name: "Test" if: matrix.target == 'x86_64-unknown-linux-gnu' with: command: test toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace --all-features + args: --target ${{ matrix.target }} --all-features # If musl, compile and test all - uses: actions-rs/cargo@v1 @@ -76,7 +76,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace --all-features + args: --target ${{ matrix.target }} --all-features env: CC: musl-gcc CXX: g++ @@ -86,7 +86,7 @@ jobs: with: command: test toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace --all-features + args: --target ${{ matrix.target }} --all-features env: CC: musl-gcc CXX: g++ diff --git a/Cargo.toml b/Cargo.toml index 510e047..a807df2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,18 +66,3 @@ chrono = { version = "^0.4", features = ["serde"] } mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } mime = "0.3" - -[workspace] -members = [ - ".", - "cloudevents-sdk-actix-web", - "cloudevents-sdk-reqwest", - "cloudevents-sdk-rdkafka", - "cloudevents-sdk-warp", -] -exclude = [ - "example-projects/actix-web-example", - "example-projects/reqwest-wasm-example", - "example-projects/rdkafka-example", - "example-projects/warp-example", -] \ No newline at end of file diff --git a/cloudevents-sdk-actix-web/Cargo.toml b/cloudevents-sdk-actix-web/Cargo.toml deleted file mode 100644 index 3634482..0000000 --- a/cloudevents-sdk-actix-web/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "cloudevents-sdk-actix-web" -version = "0.3.1" -authors = ["Francesco Guardiani "] -license-file = "../LICENSE" -edition = "2018" -description = "CloudEvents official Rust SDK - Actix-Web integration" -documentation = "https://docs.rs/cloudevents-sdk-actix-web" -repository = "https://github.com/cloudevents/sdk-rust" -readme = "README.md" -categories = ["web-programming", "encoding", "web-programming::http-server"] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -cloudevents-sdk = { version = "0.3.0", path = ".." } -actix-web = { version = "^3", default-features = false } -async-trait = "^0.1.33" -lazy_static = "1.4.0" -bytes = "^1.0" -futures = "^0.3" - -[dev-dependencies] -actix-rt = "^1" -url = { version = "^2.1", features = ["serde"] } -serde_json = "^1.0" -chrono = { version = "^0.4", features = ["serde"] } -version-sync = "^0.9" \ No newline at end of file diff --git a/cloudevents-sdk-actix-web/README.md b/cloudevents-sdk-actix-web/README.md deleted file mode 100644 index 12e1221..0000000 --- a/cloudevents-sdk-actix-web/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# CloudEvents SDK Rust - Actix-Web [![Crates badge]][crates.io] [![Docs badge]][docs.rs] - -Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [Actix Web](https://github.com/actix/actix-web). - -Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info. - -## Development & Contributing - -If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md) - -## Community - -- There are bi-weekly calls immediately following the - [Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time) - at 9am PT (US Pacific). Which means they will typically start at 10am PT, but - if the other call ends early then the SDK call will start early as well. See - the - [CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#) - to determine which week will have the call. -- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under - [CNCF's Slack workspace](https://slack.cncf.io/). -- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk -- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack). - -Each SDK may have its own unique processes, tooling and guidelines, common -governance related material can be found in the -[CloudEvents `community`](https://github.com/cloudevents/spec/tree/master/community) -directory. In particular, in there you will find information concerning -how SDK projects are -[managed](https://github.com/cloudevents/spec/blob/master/community/SDK-GOVERNANCE.md), -[guidelines](https://github.com/cloudevents/spec/blob/master/community/SDK-maintainer-guidelines.md) -for how PR reviews and approval, and our -[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information) -information. - -[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-actix-web.svg -[crates.io]: https://crates.io/crates/cloudevents-sdk-actix-web -[Docs badge]: https://docs.rs/cloudevents-sdk-actix-web/badge.svg -[docs.rs]: https://docs.rs/cloudevents-sdk-actix-web diff --git a/cloudevents-sdk-actix-web/src/headers.rs b/cloudevents-sdk-actix-web/src/headers.rs deleted file mode 100644 index ef9aa45..0000000 --- a/cloudevents-sdk-actix-web/src/headers.rs +++ /dev/null @@ -1,68 +0,0 @@ -use actix_web::http::header; -use actix_web::http::{HeaderName, HeaderValue}; -use cloudevents::event::SpecVersion; -use lazy_static::lazy_static; -use std::collections::HashMap; -use std::str::FromStr; - -macro_rules! unwrap_optional_header { - ($headers:expr, $name:expr) => { - $headers - .get::<&'static HeaderName>(&$name) - .map(|a| header_value_to_str!(a)) - }; -} - -macro_rules! header_value_to_str { - ($header_value:expr) => { - $header_value - .to_str() - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! str_to_header_value { - ($header_value:expr) => { - HeaderValue::from_str($header_value).map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! str_name_to_header { - ($attribute:expr) => { - HeaderName::from_str($attribute).map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - str_name_to_header!(&["ce-", $attribute].concat()) - }; -} - -fn attributes_to_headers( - it: impl Iterator, -) -> HashMap<&'static str, HeaderName> { - it.map(|s| { - if s == "datacontenttype" { - (s, header::CONTENT_TYPE) - } else { - (s, attribute_name_to_header!(s).unwrap()) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = - attributes_to_headers(SpecVersion::all_attribute_names()); - pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = - HeaderName::from_static("ce-specversion"); - pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = - HeaderValue::from_static("application/cloudevents+json"); -} diff --git a/cloudevents-sdk-actix-web/src/lib.rs b/cloudevents-sdk-actix-web/src/lib.rs deleted file mode 100644 index a3821aa..0000000 --- a/cloudevents-sdk-actix-web/src/lib.rs +++ /dev/null @@ -1,57 +0,0 @@ -//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Actix web](https://docs.rs/actix-web/) to easily send and receive CloudEvents. -//! -//! To deserialize an HTTP request as CloudEvent: -//! -//! ``` -//! use cloudevents_sdk_actix_web::HttpRequestExt; -//! use actix_web::{HttpRequest, web, post}; -//! -//! #[post("/")] -//! async fn post_event(req: HttpRequest, payload: web::Payload) -> Result { -//! let event = req.to_event(payload).await?; -//! println!("Received Event: {:?}", event); -//! Ok(format!("{:?}", event)) -//! } -//! ``` -//! -//! To serialize a CloudEvent to an HTTP response: -//! -//! ``` -//! use cloudevents_sdk_actix_web::HttpResponseBuilderExt; -//! use actix_web::{HttpRequest, web, get, HttpResponse}; -//! use cloudevents::{EventBuilderV10, EventBuilder}; -//! use serde_json::json; -//! -//! #[get("/")] -//! async fn get_event() -> Result { -//! Ok(HttpResponse::Ok() -//! .event( -//! EventBuilderV10::new() -//! .id("0001") -//! .ty("example.test") -//! .source("http://localhost/") -//! .data("application/json", json!({"hello": "world"})) -//! .build() -//! .expect("No error while building the event"), -//! ) -//! .await? -//! ) -//! } -//! ``` -//! -//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`] - -#![doc(html_root_url = "https://docs.rs/cloudevents-sdk-actix-web/0.3.1")] -#![deny(broken_intra_doc_links)] - -#[macro_use] -mod headers; -mod server_request; -mod server_response; - -pub use server_request::request_to_event; -pub use server_request::HttpRequestDeserializer; -pub use server_request::HttpRequestExt; -pub use server_response::event_to_response; -pub use server_response::HttpResponseBuilderExt; -pub use server_response::HttpResponseSerializer; diff --git a/cloudevents-sdk-actix-web/src/server_request.rs b/cloudevents-sdk-actix-web/src/server_request.rs deleted file mode 100644 index 5937f4b..0000000 --- a/cloudevents-sdk-actix-web/src/server_request.rs +++ /dev/null @@ -1,210 +0,0 @@ -use super::headers; -use actix_web::http::HeaderName; -use actix_web::web::{Bytes, BytesMut}; -use actix_web::{web, HttpMessage, HttpRequest}; -use async_trait::async_trait; -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, - Result, StructuredDeserializer, StructuredSerializer, -}; -use cloudevents::{message, Event}; -use futures::StreamExt; -use std::convert::TryFrom; - -/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait. -pub struct HttpRequestDeserializer<'a> { - req: &'a HttpRequest, - body: Bytes, -} - -impl HttpRequestDeserializer<'_> { - pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer { - HttpRequestDeserializer { req, body } - } -} - -impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> { - fn deserialize_binary>(self, mut visitor: V) -> Result { - if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}); - } - - let spec_version = SpecVersion::try_from( - unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?, - )?; - - visitor = visitor.set_spec_version(spec_version.clone())?; - - let attributes = spec_version.attribute_names(); - - for (hn, hv) in - self.req.headers().iter().filter(|(hn, _)| { - headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-") - }) - { - let name = &hn.as_str()["ce-".len()..]; - - if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - } - - if let Some(hv) = self.req.headers().get("content-type") { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - - if !self.body.is_empty() { - visitor.end_with_data(self.body.to_vec()) - } else { - visitor.end() - } - } -} - -impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> { - fn deserialize_structured>(self, visitor: V) -> Result { - if self.encoding() != Encoding::STRUCTURED { - return Err(message::Error::WrongEncoding {}); - } - visitor.set_structured_event(self.body.to_vec()) - } -} - -impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> { - fn encoding(&self) -> Encoding { - if self.req.content_type() == "application/cloudevents+json" { - Encoding::STRUCTURED - } else if self - .req - .headers() - .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) - .is_some() - { - Encoding::BINARY - } else { - Encoding::UNKNOWN - } - } -} - -/// Method to transform an incoming [`HttpRequest`] to [`Event`]. -pub async fn request_to_event( - req: &HttpRequest, - mut payload: web::Payload, -) -> std::result::Result { - let mut bytes = BytesMut::new(); - while let Some(item) = payload.next().await { - bytes.extend_from_slice(&item?); - } - MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze())) - .map_err(actix_web::error::ErrorBadRequest) -} - -/// Extention Trait for [`HttpRequest`] which acts as a wrapper for the function [`request_to_event()`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -#[async_trait(?Send)] -pub trait HttpRequestExt: private::Sealed { - /// Convert this [`HttpRequest`] into an [`Event`]. - async fn to_event( - &self, - mut payload: web::Payload, - ) -> std::result::Result; -} - -#[async_trait(?Send)] -impl HttpRequestExt for HttpRequest { - async fn to_event( - &self, - payload: web::Payload, - ) -> std::result::Result { - request_to_event(self, payload).await - } -} - -mod private { - // Sealing the RequestExt - pub trait Sealed {} - impl Sealed for actix_web::HttpRequest {} -} - -#[cfg(test)] -mod tests { - use super::*; - use actix_web::test; - - use chrono::Utc; - use cloudevents::{EventBuilder, EventBuilderV10}; - use serde_json::json; - - #[actix_rt::test] - async fn test_request() { - let time = Utc::now(); - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost/") - //TODO this is required now because the message deserializer implictly set default values - // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) - .time(time) - .extension("someint", "10") - .build() - .unwrap(); - - let (req, payload) = test::TestRequest::post() - .header("ce-specversion", "1.0") - .header("ce-id", "0001") - .header("ce-type", "example.test") - .header("ce-source", "http://localhost/") - .header("ce-someint", "10") - .header("ce-time", time.to_rfc3339()) - .to_http_parts(); - - let resp = req.to_event(web::Payload(payload)).await.unwrap(); - assert_eq!(expected, resp); - } - - #[actix_rt::test] - async fn test_request_with_full_data() { - let time = Utc::now(); - let j = json!({"hello": "world"}); - - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - //TODO this is required now because the message deserializer implictly set default values - // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) - .time(time) - .data("application/json", j.to_string().into_bytes()) - .extension("someint", "10") - .build() - .unwrap(); - - let (req, payload) = test::TestRequest::post() - .header("ce-specversion", "1.0") - .header("ce-id", "0001") - .header("ce-type", "example.test") - .header("ce-source", "http://localhost") - .header("ce-someint", "10") - .header("ce-time", time.to_rfc3339()) - .header("content-type", "application/json") - .set_json(&j) - .to_http_parts(); - - let resp = req.to_event(web::Payload(payload)).await.unwrap(); - assert_eq!(expected, resp); - } -} diff --git a/cloudevents-sdk-actix-web/src/server_response.rs b/cloudevents-sdk-actix-web/src/server_response.rs deleted file mode 100644 index f22a724..0000000 --- a/cloudevents-sdk-actix-web/src/server_response.rs +++ /dev/null @@ -1,214 +0,0 @@ -use super::headers; -use actix_web::dev::HttpResponseBuilder; -use actix_web::http::{HeaderName, HeaderValue}; -use actix_web::HttpResponse; -use async_trait::async_trait; -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, -}; -use cloudevents::Event; -use std::str::FromStr; - -/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`]. -pub struct HttpResponseSerializer { - builder: HttpResponseBuilder, -} - -impl HttpResponseSerializer { - pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer { - HttpResponseSerializer { builder } - } -} - -impl BinarySerializer for HttpResponseSerializer { - fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.builder.set_header( - headers::SPEC_VERSION_HEADER.clone(), - str_to_header_value!(spec_version.as_str())?, - ); - Ok(self) - } - - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.builder.set_header( - headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), - str_to_header_value!(value.to_string().as_str())?, - ); - Ok(self) - } - - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.builder.set_header( - attribute_name_to_header!(name)?, - str_to_header_value!(value.to_string().as_str())?, - ); - Ok(self) - } - - fn end_with_data(mut self, bytes: Vec) -> Result { - Ok(self.builder.body(bytes)) - } - - fn end(mut self) -> Result { - Ok(self.builder.finish()) - } -} - -impl StructuredSerializer for HttpResponseSerializer { - fn set_structured_event(mut self, bytes: Vec) -> Result { - Ok(self - .builder - .set_header( - actix_web::http::header::CONTENT_TYPE, - headers::CLOUDEVENTS_JSON_HEADER.clone(), - ) - .body(bytes)) - } -} - -/// Method to fill an [`HttpResponseBuilder`] with an [`Event`]. -pub async fn event_to_response( - event: Event, - response: HttpResponseBuilder, -) -> std::result::Result { - BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response)) - .map_err(actix_web::error::ErrorBadRequest) -} - -/// Extension Trait for [`HttpResponseBuilder`] which acts as a wrapper for the function [`event_to_response()`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -#[async_trait(?Send)] -pub trait HttpResponseBuilderExt: private::Sealed { - /// Fill this [`HttpResponseBuilder`] with an [`Event`]. - async fn event( - self, - event: Event, - ) -> std::result::Result; -} - -#[async_trait(?Send)] -impl HttpResponseBuilderExt for HttpResponseBuilder { - async fn event( - self, - event: Event, - ) -> std::result::Result { - event_to_response(event, self).await - } -} - -// Sealing the HttpResponseBuilderExt -mod private { - pub trait Sealed {} - impl Sealed for actix_web::dev::HttpResponseBuilder {} -} - -#[cfg(test)] -mod tests { - use super::*; - - use actix_web::http::StatusCode; - use actix_web::test; - use cloudevents::{EventBuilder, EventBuilderV10}; - use futures::TryStreamExt; - use serde_json::json; - - #[actix_rt::test] - async fn test_response() { - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost/") - .extension("someint", "10") - .build() - .unwrap(); - - let resp = HttpResponseBuilder::new(StatusCode::OK) - .event(input) - .await - .unwrap(); - - assert_eq!( - resp.headers() - .get("ce-specversion") - .unwrap() - .to_str() - .unwrap(), - "1.0" - ); - assert_eq!( - resp.headers().get("ce-id").unwrap().to_str().unwrap(), - "0001" - ); - assert_eq!( - resp.headers().get("ce-type").unwrap().to_str().unwrap(), - "example.test" - ); - assert_eq!( - resp.headers().get("ce-source").unwrap().to_str().unwrap(), - "http://localhost/" - ); - assert_eq!( - resp.headers().get("ce-someint").unwrap().to_str().unwrap(), - "10" - ); - } - - #[actix_rt::test] - async fn test_response_with_full_data() { - let j = json!({"hello": "world"}); - - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - let mut resp = HttpResponseBuilder::new(StatusCode::OK) - .event(input) - .await - .unwrap(); - - assert_eq!( - resp.headers() - .get("ce-specversion") - .unwrap() - .to_str() - .unwrap(), - "1.0" - ); - assert_eq!( - resp.headers().get("ce-id").unwrap().to_str().unwrap(), - "0001" - ); - assert_eq!( - resp.headers().get("ce-type").unwrap().to_str().unwrap(), - "example.test" - ); - assert_eq!( - resp.headers().get("ce-source").unwrap().to_str().unwrap(), - "http://localhost" - ); - assert_eq!( - resp.headers() - .get("content-type") - .unwrap() - .to_str() - .unwrap(), - "application/json" - ); - assert_eq!( - resp.headers().get("ce-someint").unwrap().to_str().unwrap(), - "10" - ); - - let bytes = test::load_stream(resp.take_body().into_stream()) - .await - .unwrap(); - assert_eq!(j.to_string().as_bytes(), bytes.as_ref()) - } -} diff --git a/cloudevents-sdk-actix-web/tests/version_number.rs b/cloudevents-sdk-actix-web/tests/version_number.rs deleted file mode 100644 index 288592d..0000000 --- a/cloudevents-sdk-actix-web/tests/version_number.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[test] -fn test_readme_deps() { - version_sync::assert_markdown_deps_updated!("README.md"); -} - -#[test] -fn test_html_root_url() { - version_sync::assert_html_root_url_updated!("src/lib.rs"); -} diff --git a/cloudevents-sdk-rdkafka/Cargo.toml b/cloudevents-sdk-rdkafka/Cargo.toml deleted file mode 100644 index 5030765..0000000 --- a/cloudevents-sdk-rdkafka/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "cloudevents-sdk-rdkafka" -version = "0.3.1" -authors = ["Pranav Bhatt "] -edition = "2018" -license-file = "../LICENSE" -description = "CloudEvents official Rust SDK - Kafka integration" -documentation = "https://docs.rs/cloudevents-sdk-rdkafka" -repository = "https://github.com/cloudevents/sdk-rust" -readme = "README.md" -categories = ["web-programming", "encoding"] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = "^1.0" -cloudevents-sdk = { version = "0.3.0", path = ".." } -lazy_static = "1.4.0" -rdkafka = { version = "^0.25", features = ["cmake-build"] } - -[dev-dependencies] -url = { version = "^2.1" } -serde_json = "^1.0" -chrono = { version = "^0.4", features = ["serde"] } -futures = "^0.3" -version-sync = "^0.9" diff --git a/cloudevents-sdk-rdkafka/README.md b/cloudevents-sdk-rdkafka/README.md deleted file mode 100644 index 7e844cc..0000000 --- a/cloudevents-sdk-rdkafka/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# CloudEvents SDK Rust - rdkafka [![Crates badge]][crates.io] [![Docs badge]][docs.rs] - -Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [rdkafka](https://github.com/fede1024/rust-rdkafka) - -Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info. - -## Development & Contributing - -If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md) - -## Community - -- There are bi-weekly calls immediately following the - [Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time) - at 9am PT (US Pacific). Which means they will typically start at 10am PT, but - if the other call ends early then the SDK call will start early as well. See - the - [CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#) - to determine which week will have the call. -- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under - [CNCF's Slack workspace](https://slack.cncf.io/). -- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk -- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack). - -Each SDK may have its own unique processes, tooling and guidelines, common -governance related material can be found in the -[CloudEvents `community`](https://github.com/cloudevents/spec/tree/master/community) -directory. In particular, in there you will find information concerning -how SDK projects are -[managed](https://github.com/cloudevents/spec/blob/master/community/SDK-GOVERNANCE.md), -[guidelines](https://github.com/cloudevents/spec/blob/master/community/SDK-maintainer-guidelines.md) -for how PR reviews and approval, and our -[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information) -information. - -[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-rdkafka.svg -[crates.io]: https://crates.io/crates/cloudevents-sdk-rdkafka -[Docs badge]: https://docs.rs/cloudevents-sdk-rdkafka/badge.svg -[docs.rs]: https://docs.rs/cloudevents-sdk-rdkafka diff --git a/cloudevents-sdk-rdkafka/src/headers.rs b/cloudevents-sdk-rdkafka/src/headers.rs deleted file mode 100644 index 0bac9a8..0000000 --- a/cloudevents-sdk-rdkafka/src/headers.rs +++ /dev/null @@ -1,29 +0,0 @@ -use cloudevents::event::SpecVersion; -use lazy_static::lazy_static; -use std::collections::HashMap; - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - format!("ce_{}", $attribute) - }; -} - -fn attributes_to_headers(it: impl Iterator) -> HashMap<&'static str, String> { - it.map(|s| { - if s == "datacontenttype" { - (s, String::from("content-type")) - } else { - (s, attribute_name_to_header!(s)) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> = - attributes_to_headers(SpecVersion::all_attribute_names()); -} - -pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion"; -pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json"; -pub(crate) static CONTENT_TYPE: &str = "content-type"; diff --git a/cloudevents-sdk-rdkafka/src/kafka_consumer_record.rs b/cloudevents-sdk-rdkafka/src/kafka_consumer_record.rs deleted file mode 100644 index 5c65aa3..0000000 --- a/cloudevents-sdk-rdkafka/src/kafka_consumer_record.rs +++ /dev/null @@ -1,258 +0,0 @@ -use crate::headers; -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, - Result, StructuredDeserializer, StructuredSerializer, -}; -use cloudevents::{message, Event}; -use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage}; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::str; - -/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait. -pub struct ConsumerRecordDeserializer { - pub(crate) headers: HashMap>, - pub(crate) payload: Option>, -} - -impl ConsumerRecordDeserializer { - fn get_kafka_headers(message: &impl Message) -> Result>> { - let mut hm = HashMap::new(); - let headers = message - .headers() - // TODO create an error variant for invalid headers - .ok_or(cloudevents::message::Error::WrongEncoding {})?; - for i in 0..headers.count() { - let header = headers.get(i).unwrap(); - hm.insert(header.0.to_string(), Vec::from(header.1)); - } - Ok(hm) - } - - pub fn new(message: &impl Message) -> Result { - Ok(ConsumerRecordDeserializer { - headers: Self::get_kafka_headers(message)?, - payload: message.payload().map(Vec::from), - }) - } -} - -impl BinaryDeserializer for ConsumerRecordDeserializer { - fn deserialize_binary>(mut self, mut visitor: V) -> Result { - if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}); - } - - let spec_version = SpecVersion::try_from( - str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..]) - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - })?, - )?; - - visitor = visitor.set_spec_version(spec_version.clone())?; - - let attributes = spec_version.attribute_names(); - - if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?), - )? - } - - for (hn, hv) in self - .headers - .into_iter() - .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) - { - let name = &hn["ce_".len()..]; - - if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?), - )? - } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?), - )? - } - } - - if self.payload != None { - visitor.end_with_data(self.payload.unwrap()) - } else { - visitor.end() - } - } -} - -impl StructuredDeserializer for ConsumerRecordDeserializer { - fn deserialize_structured>(self, visitor: V) -> Result { - if self.encoding() != Encoding::STRUCTURED { - return Err(message::Error::WrongEncoding {}); - } - visitor.set_structured_event(self.payload.unwrap()) - } -} - -impl MessageDeserializer for ConsumerRecordDeserializer { - fn encoding(&self) -> Encoding { - match ( - self.headers - .get("content-type") - .map(|s| String::from_utf8(s.to_vec()).ok()) - .flatten() - .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) - .unwrap_or(false), - self.headers.get(headers::SPEC_VERSION_HEADER), - ) { - (true, _) => Encoding::STRUCTURED, - (_, Some(_)) => Encoding::BINARY, - _ => Encoding::UNKNOWN, - } - } -} - -/// Method to transform a [`Message`] to [`Event`]. -pub fn record_to_event(msg: &impl Message) -> Result { - MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?) -} - -/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -pub trait MessageExt: private::Sealed { - /// Generates [`Event`] from [`BorrowedMessage`]. - fn to_event(&self) -> Result; -} - -impl MessageExt for BorrowedMessage<'_> { - fn to_event(&self) -> Result { - record_to_event(self) - } -} - -impl MessageExt for OwnedMessage { - fn to_event(&self) -> Result { - record_to_event(self) - } -} - -mod private { - // Sealing the MessageExt - pub trait Sealed {} - impl Sealed for rdkafka::message::OwnedMessage {} - impl Sealed for rdkafka::message::BorrowedMessage<'_> {} -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::kafka_producer_record::MessageRecord; - - use chrono::Utc; - use cloudevents::{EventBuilder, EventBuilderV10}; - use serde_json::json; - - #[test] - fn test_binary_record() { - let time = Utc::now(); - - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .time(time) - .source("http://localhost") - .extension("someint", "10") - .build() - .unwrap(); - - // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into - // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct, - // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like - // in the case of BorrowedMessage - - let message_record = MessageRecord::from_event( - EventBuilderV10::new() - .id("0001") - .ty("example.test") - .time(time) - .source("http://localhost") - .extension("someint", "10") - .build() - .unwrap(), - ) - .unwrap(); - - let owned_message = OwnedMessage::new( - message_record.payload, - Some(String::from("test key").into_bytes()), - String::from("test topic"), - rdkafka::message::Timestamp::NotAvailable, - 10, - 10, - Some(message_record.headers), - ); - - assert_eq!(owned_message.to_event().unwrap(), expected) - } - - #[test] - fn test_structured_record() { - let j = json!({"hello": "world"}); - - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into - // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct, - // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like - // in the case of BorrowedMessage - - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - let serialized_event = - StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap(); - - let owned_message = OwnedMessage::new( - serialized_event.payload, - Some(String::from("test key").into_bytes()), - String::from("test topic"), - rdkafka::message::Timestamp::NotAvailable, - 10, - 10, - Some(serialized_event.headers), - ); - - assert_eq!(owned_message.to_event().unwrap(), expected) - } -} diff --git a/cloudevents-sdk-rdkafka/src/kafka_producer_record.rs b/cloudevents-sdk-rdkafka/src/kafka_producer_record.rs deleted file mode 100644 index 88146f9..0000000 --- a/cloudevents-sdk-rdkafka/src/kafka_producer_record.rs +++ /dev/null @@ -1,153 +0,0 @@ -use super::headers; -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, -}; -use cloudevents::Event; -use rdkafka::message::{OwnedHeaders, ToBytes}; -use rdkafka::producer::{BaseRecord, FutureRecord}; - -/// This struct contains a serialized CloudEvent message in the Kafka shape. -/// Implements [`StructuredSerializer`] & [`BinarySerializer`] traits. -/// -/// To instantiate a new `MessageRecord` from an [`Event`], -/// look at [`Self::from_event`] or use [`StructuredDeserializer::deserialize_structured`](cloudevents::message::StructuredDeserializer::deserialize_structured) -/// or [`BinaryDeserializer::deserialize_binary`]. -pub struct MessageRecord { - pub(crate) headers: OwnedHeaders, - pub(crate) payload: Option>, -} - -impl MessageRecord { - /// Create a new empty [`MessageRecord`] - pub fn new() -> Self { - MessageRecord { - headers: OwnedHeaders::new(), - payload: None, - } - } - - /// Create a new [`MessageRecord`], filled with `event` serialized in binary mode. - pub fn from_event(event: Event) -> Result { - BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) - } -} - -impl Default for MessageRecord { - fn default() -> Self { - Self::new() - } -} - -impl BinarySerializer for MessageRecord { - fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.headers = self - .headers - .add(headers::SPEC_VERSION_HEADER, spec_version.as_str()); - - Ok(self) - } - - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.headers = self.headers.add( - &headers::ATTRIBUTES_TO_HEADERS - .get(name) - .ok_or(cloudevents::message::Error::UnknownAttribute { - name: String::from(name), - })? - .clone()[..], - &value.to_string()[..], - ); - - Ok(self) - } - - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.headers = self - .headers - .add(&attribute_name_to_header!(name)[..], &value.to_string()[..]); - - Ok(self) - } - - fn end_with_data(mut self, bytes: Vec) -> Result { - self.payload = Some(bytes); - - Ok(self) - } - - fn end(self) -> Result { - Ok(self) - } -} - -impl StructuredSerializer for MessageRecord { - fn set_structured_event(mut self, bytes: Vec) -> Result { - self.headers = self - .headers - .add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER); - - self.payload = Some(bytes); - - Ok(self) - } -} - -/// Extension Trait for [`BaseRecord`] that fills the record with a [`MessageRecord`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed { - /// Fill this [`BaseRecord`] with a [`MessageRecord`]. - fn message_record( - self, - message_record: &'a MessageRecord, - ) -> Result>>; -} - -impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec> { - fn message_record( - mut self, - message_record: &'a MessageRecord, - ) -> Result>> { - self = self.headers(message_record.headers.clone()); - - if let Some(s) = message_record.payload.as_ref() { - self = self.payload(s); - } - - Ok(self) - } -} - -/// Extension Trait for [`FutureRecord`] that fills the record with a [`MessageRecord`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed { - /// Fill this [`FutureRecord`] with a [`MessageRecord`]. - fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec>; -} - -impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec> { - fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec> { - self = self.headers(message_record.headers.clone()); - - if let Some(s) = message_record.payload.as_ref() { - self = self.payload(s); - } - - self - } -} - -mod private { - // Sealing the FutureRecordExt and BaseRecordExt - pub trait Sealed {} - impl Sealed - for rdkafka::producer::FutureRecord<'_, K, V> - { - } - impl Sealed - for rdkafka::producer::BaseRecord<'_, K, V> - { - } -} diff --git a/cloudevents-sdk-rdkafka/src/lib.rs b/cloudevents-sdk-rdkafka/src/lib.rs deleted file mode 100644 index 32d38da..0000000 --- a/cloudevents-sdk-rdkafka/src/lib.rs +++ /dev/null @@ -1,65 +0,0 @@ -//! This library provides Kafka protocol bindings for CloudEvents -//! using the [rust-rdkafka](https://fede1024.github.io/rust-rdkafka) library. -//! -//! To produce Cloudevents: -//! -//! ``` -//! -//! use cloudevents::Event; -//! use rdkafka::producer::{FutureProducer, FutureRecord}; -//! use rdkafka::util::Timeout; -//! use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt}; -//! -//! # async fn produce(producer: &FutureProducer, event: Event) -> Result<(), Box> { -//! let message_record = MessageRecord::from_event(event)?; -//! -//! producer.send( -//! FutureRecord::to("topic") -//! .key("some_event") -//! .message_record(&message_record), -//! Timeout::Never -//! ).await; -//! # Ok(()) -//! # } -//! -//! ``` -//! -//! To consume Cloudevents: -//! -//! ``` -//! use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode}; -//! use cloudevents_sdk_rdkafka::MessageExt; -//! use futures::StreamExt; -//! -//! # async fn consume(consumer: StreamConsumer) -> Result<(), Box> { -//! let mut message_stream = consumer.start(); -//! -//! while let Some(message) = message_stream.next().await { -//! match message { -//! Err(e) => println!("Kafka error: {}", e), -//! Ok(m) => { -//! let event = m.to_event()?; -//! println!("Received Event: {}", event); -//! consumer.commit_message(&m, CommitMode::Async)?; -//! } -//! }; -//! } -//! # Ok(()) -//! # } -//! ``` - -#![doc(html_root_url = "https://docs.rs/cloudevents-sdk-rdkafka/0.3.1")] -#![deny(broken_intra_doc_links)] - -#[macro_use] -mod headers; -mod kafka_consumer_record; -mod kafka_producer_record; - -pub use kafka_consumer_record::record_to_event; -pub use kafka_consumer_record::ConsumerRecordDeserializer; -pub use kafka_consumer_record::MessageExt; - -pub use kafka_producer_record::BaseRecordExt; -pub use kafka_producer_record::FutureRecordExt; -pub use kafka_producer_record::MessageRecord; diff --git a/cloudevents-sdk-rdkafka/tests/version_number.rs b/cloudevents-sdk-rdkafka/tests/version_number.rs deleted file mode 100644 index 288592d..0000000 --- a/cloudevents-sdk-rdkafka/tests/version_number.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[test] -fn test_readme_deps() { - version_sync::assert_markdown_deps_updated!("README.md"); -} - -#[test] -fn test_html_root_url() { - version_sync::assert_html_root_url_updated!("src/lib.rs"); -} diff --git a/cloudevents-sdk-reqwest/Cargo.toml b/cloudevents-sdk-reqwest/Cargo.toml deleted file mode 100644 index 7f3d626..0000000 --- a/cloudevents-sdk-reqwest/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "cloudevents-sdk-reqwest" -version = "0.3.1" -authors = ["Francesco Guardiani "] -license-file = "../LICENSE" -edition = "2018" -description = "CloudEvents official Rust SDK - Reqwest integration" -documentation = "https://docs.rs/cloudevents-sdk-reqwest" -repository = "https://github.com/cloudevents/sdk-rust" -readme = "README.md" -categories = ["web-programming", "encoding", "web-programming::http-client"] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "^0.1.33" -cloudevents-sdk = { version = "0.3.0", path = ".." } -lazy_static = "1.4.0" -bytes = "^1.0" - -[dependencies.reqwest] -version = "^0.11" -default-features = false -features = ["rustls-tls"] - -[dev-dependencies] -mockito = "0.25.1" -tokio = { version = "^1.0", features = ["full"] } -url = { version = "^2.1" } -serde_json = "^1.0" -chrono = { version = "^0.4", features = ["serde"] } -version-sync = "^0.9" \ No newline at end of file diff --git a/cloudevents-sdk-reqwest/README.md b/cloudevents-sdk-reqwest/README.md deleted file mode 100644 index 5db152f..0000000 --- a/cloudevents-sdk-reqwest/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# CloudEvents SDK Rust - Reqwest [![Crates badge]][crates.io] [![Docs badge]][docs.rs] - -Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [reqwest](https://github.com/seanmonstar/reqwest) - -Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info. - -## Development & Contributing - -If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md) - -## Community - -- There are bi-weekly calls immediately following the - [Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time) - at 9am PT (US Pacific). Which means they will typically start at 10am PT, but - if the other call ends early then the SDK call will start early as well. See - the - [CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#) - to determine which week will have the call. -- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under - [CNCF's Slack workspace](https://slack.cncf.io/). -- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk -- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack). - -Each SDK may have its own unique processes, tooling and guidelines, common -governance related material can be found in the -[CloudEvents `community`](https://github.com/cloudevents/spec/tree/master/community) -directory. In particular, in there you will find information concerning -how SDK projects are -[managed](https://github.com/cloudevents/spec/blob/master/community/SDK-GOVERNANCE.md), -[guidelines](https://github.com/cloudevents/spec/blob/master/community/SDK-maintainer-guidelines.md) -for how PR reviews and approval, and our -[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information) -information. - -[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-reqwest.svg -[crates.io]: https://crates.io/crates/cloudevents-sdk-reqwest -[Docs badge]: https://docs.rs/cloudevents-sdk-reqwest/badge.svg -[docs.rs]: https://docs.rs/cloudevents-sdk-reqwest diff --git a/cloudevents-sdk-reqwest/src/client_request.rs b/cloudevents-sdk-reqwest/src/client_request.rs deleted file mode 100644 index 0de580e..0000000 --- a/cloudevents-sdk-reqwest/src/client_request.rs +++ /dev/null @@ -1,199 +0,0 @@ -use super::headers; -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, -}; -use cloudevents::Event; -use reqwest::RequestBuilder; -use std::str::FromStr; - -/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits. -pub struct RequestSerializer { - req: RequestBuilder, -} - -impl RequestSerializer { - pub fn new(req: RequestBuilder) -> RequestSerializer { - RequestSerializer { req } - } -} - -impl BinarySerializer for RequestSerializer { - fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.req = self - .req - .header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str()); - Ok(self) - } - - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.req = self.req.header( - headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), - value.to_string(), - ); - Ok(self) - } - - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.req = self - .req - .header(attribute_name_to_header!(name)?, value.to_string()); - Ok(self) - } - - fn end_with_data(self, bytes: Vec) -> Result { - Ok(self.req.body(bytes)) - } - - fn end(self) -> Result { - Ok(self.req) - } -} - -impl StructuredSerializer for RequestSerializer { - fn set_structured_event(self, bytes: Vec) -> Result { - Ok(self - .req - .header( - reqwest::header::CONTENT_TYPE, - headers::CLOUDEVENTS_JSON_HEADER.clone(), - ) - .body(bytes)) - } -} - -/// Method to fill a [`RequestBuilder`] with an [`Event`]. -pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result { - BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder)) -} - -/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -pub trait RequestBuilderExt: private::Sealed { - /// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`]. - fn event(self, event: Event) -> Result; -} - -impl RequestBuilderExt for RequestBuilder { - fn event(self, event: Event) -> Result { - event_to_request(event, self) - } -} - -// Sealing the RequestBuilderExt -mod private { - pub trait Sealed {} - impl Sealed for reqwest::RequestBuilder {} -} - -#[cfg(test)] -mod tests { - use super::*; - use mockito::{mock, Matcher}; - - use cloudevents::message::StructuredDeserializer; - use cloudevents::{EventBuilder, EventBuilderV10}; - use serde_json::json; - - #[tokio::test] - async fn test_request() { - let url = mockito::server_url(); - let m = mock("POST", "/") - .match_header("ce-specversion", "1.0") - .match_header("ce-id", "0001") - .match_header("ce-type", "example.test") - .match_header("ce-source", "http://localhost/") - .match_header("ce-someint", "10") - .match_body(Matcher::Missing) - .create(); - - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost/") - .extension("someint", "10") - .build() - .unwrap(); - - let client = reqwest::Client::new(); - client - .post(&url) - .event(input) - .unwrap() - .send() - .await - .unwrap(); - - m.assert(); - } - - #[tokio::test] - async fn test_request_with_full_data() { - let j = json!({"hello": "world"}); - - let url = mockito::server_url(); - let m = mock("POST", "/") - .match_header("ce-specversion", "1.0") - .match_header("ce-id", "0001") - .match_header("ce-type", "example.test") - .match_header("ce-source", "http://localhost/") - .match_header("content-type", "application/json") - .match_header("ce-someint", "10") - .match_body(Matcher::Exact(j.to_string())) - .create(); - - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost/") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - let client = reqwest::Client::new(); - - client - .post(&url) - .event(input) - .unwrap() - .send() - .await - .unwrap(); - - m.assert(); - } - - #[tokio::test] - async fn test_structured_request_with_full_data() { - let j = json!({"hello": "world"}); - - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - let url = mockito::server_url(); - let m = mock("POST", "/") - .match_header("content-type", "application/cloudevents+json") - .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap())) - .create(); - - let client = reqwest::Client::new(); - StructuredDeserializer::deserialize_structured( - input, - RequestSerializer::new(client.post(&url)), - ) - .unwrap() - .send() - .await - .unwrap(); - - m.assert(); - } -} diff --git a/cloudevents-sdk-reqwest/src/client_response.rs b/cloudevents-sdk-reqwest/src/client_response.rs deleted file mode 100644 index 5d11ee6..0000000 --- a/cloudevents-sdk-reqwest/src/client_response.rs +++ /dev/null @@ -1,263 +0,0 @@ -use super::headers; -use async_trait::async_trait; -use bytes::Bytes; -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, - MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, -}; -use cloudevents::{message, Event}; -use reqwest::header::{HeaderMap, HeaderName}; -use reqwest::Response; -use std::convert::TryFrom; - -/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait. -pub struct ResponseDeserializer { - headers: HeaderMap, - body: Bytes, -} - -impl ResponseDeserializer { - pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer { - ResponseDeserializer { headers, body } - } -} - -impl BinaryDeserializer for ResponseDeserializer { - fn deserialize_binary>(self, mut visitor: V) -> Result { - if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}); - } - - let spec_version = SpecVersion::try_from( - unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, - )?; - - visitor = visitor.set_spec_version(spec_version.clone())?; - - let attributes = spec_version.attribute_names(); - - for (hn, hv) in self - .headers - .iter() - .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) - { - let name = &hn.as_str()["ce-".len()..]; - - if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - } - - if let Some(hv) = self.headers.get("content-type") { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - - if !self.body.is_empty() { - visitor.end_with_data(self.body.to_vec()) - } else { - visitor.end() - } - } -} - -impl StructuredDeserializer for ResponseDeserializer { - fn deserialize_structured>(self, visitor: V) -> Result { - if self.encoding() != Encoding::STRUCTURED { - return Err(message::Error::WrongEncoding {}); - } - visitor.set_structured_event(self.body.to_vec()) - } -} - -impl MessageDeserializer for ResponseDeserializer { - fn encoding(&self) -> Encoding { - match ( - #[allow(clippy::borrow_interior_mutable_const)] - unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE) - .map(|r| r.ok()) - .flatten() - .map(|e| e.starts_with("application/cloudevents+json")), - self.headers - .get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER), - ) { - (Some(true), _) => Encoding::STRUCTURED, - (_, Some(_)) => Encoding::BINARY, - _ => Encoding::UNKNOWN, - } - } -} - -/// Method to transform an incoming [`Response`] to [`Event`]. -pub async fn response_to_event(res: Response) -> Result { - let h = res.headers().to_owned(); - let b = res.bytes().await.map_err(|e| Error::Other { - source: Box::new(e), - })?; - - MessageDeserializer::into_event(ResponseDeserializer::new(h, b)) -} - -/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`]. -/// -/// This trait is sealed and cannot be implemented for types outside of this crate. -#[async_trait(?Send)] -pub trait ResponseExt: private::Sealed { - /// Convert this [`Response`] to [`Event`]. - async fn into_event(self) -> Result; -} - -#[async_trait(?Send)] -impl ResponseExt for Response { - async fn into_event(self) -> Result { - response_to_event(self).await - } -} - -// Sealing the ResponseExt -mod private { - pub trait Sealed {} - impl Sealed for reqwest::Response {} -} - -#[cfg(test)] -mod tests { - use super::*; - use mockito::mock; - - use chrono::Utc; - use cloudevents::{EventBuilder, EventBuilderV10}; - use serde_json::json; - - #[tokio::test] - async fn test_response() { - let time = Utc::now(); - let url = mockito::server_url(); - let _m = mock("GET", "/") - .with_status(200) - .with_header("ce-specversion", "1.0") - .with_header("ce-id", "0001") - .with_header("ce-type", "example.test") - .with_header("ce-source", "http://localhost") - .with_header("ce-someint", "10") - .with_header("ce-time", &time.to_rfc3339()) - .create(); - - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - //TODO this is required now because the message deserializer implictly set default values - // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) - .time(time) - .source("http://localhost") - .extension("someint", "10") - .build() - .unwrap(); - - let client = reqwest::Client::new(); - let res = client - .get(&url) - .send() - .await - .unwrap() - .into_event() - .await - .unwrap(); - - assert_eq!(expected, res); - } - - #[tokio::test] - async fn test_response_with_full_data() { - let time = Utc::now(); - let j = json!({"hello": "world"}); - - let url = mockito::server_url(); - let _m = mock("GET", "/") - .with_status(200) - .with_header("ce-specversion", "1.0") - .with_header("ce-id", "0001") - .with_header("ce-type", "example.test") - .with_header("ce-source", "http://localhost/") - .with_header("content-type", "application/json") - .with_header("ce-someint", "10") - .with_header("ce-time", &time.to_rfc3339()) - .with_body(j.to_string()) - .create(); - - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - //TODO this is required now because the message deserializer implictly set default values - // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) - .time(time) - .source("http://localhost/") - .data("application/json", j.to_string().into_bytes()) - .extension("someint", "10") - .build() - .unwrap(); - - let client = reqwest::Client::new(); - let res = client - .get(&url) - .send() - .await - .unwrap() - .into_event() - .await - .unwrap(); - - assert_eq!(expected, res); - } - - #[tokio::test] - async fn test_structured_response_with_full_data() { - let time = Utc::now(); - - let j = json!({"hello": "world"}); - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - //TODO this is required now because the message deserializer implictly set default values - // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) - .time(time) - .source("http://localhost") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - let url = mockito::server_url(); - let _m = mock("GET", "/") - .with_status(200) - .with_header( - "content-type", - "application/cloudevents+json; charset=utf-8", - ) - .with_body(serde_json::to_string(&expected).unwrap()) - .create(); - - let client = reqwest::Client::new(); - let res = client - .get(&url) - .send() - .await - .unwrap() - .into_event() - .await - .unwrap(); - - assert_eq!(expected, res); - } -} diff --git a/cloudevents-sdk-reqwest/src/headers.rs b/cloudevents-sdk-reqwest/src/headers.rs deleted file mode 100644 index d4bea48..0000000 --- a/cloudevents-sdk-reqwest/src/headers.rs +++ /dev/null @@ -1,61 +0,0 @@ -use cloudevents::event::SpecVersion; -use lazy_static::lazy_static; -use reqwest::header::{HeaderName, HeaderValue}; -use std::collections::HashMap; -use std::str::FromStr; - -macro_rules! unwrap_optional_header { - ($headers:expr, $name:expr) => { - $headers - .get::<&'static reqwest::header::HeaderName>(&$name) - .map(|a| header_value_to_str!(a)) - }; -} - -macro_rules! header_value_to_str { - ($header_value:expr) => { - $header_value - .to_str() - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! str_name_to_header { - ($attribute:expr) => { - reqwest::header::HeaderName::from_str($attribute).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - }) - }; -} - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - str_name_to_header!(&["ce-", $attribute].concat()) - }; -} - -fn attributes_to_headers( - it: impl Iterator, -) -> HashMap<&'static str, HeaderName> { - it.map(|s| { - if s == "datacontenttype" { - (s, reqwest::header::CONTENT_TYPE) - } else { - (s, attribute_name_to_header!(s).unwrap()) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = - attributes_to_headers(SpecVersion::all_attribute_names()); - pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = - HeaderName::from_static("ce-specversion"); - pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = - HeaderValue::from_static("application/cloudevents+json"); -} diff --git a/cloudevents-sdk-reqwest/src/lib.rs b/cloudevents-sdk-reqwest/src/lib.rs deleted file mode 100644 index 37a2165..0000000 --- a/cloudevents-sdk-reqwest/src/lib.rs +++ /dev/null @@ -1,45 +0,0 @@ -//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [reqwest](https://docs.rs/reqwest/) to easily send and receive CloudEvents. -//! -//! ``` -//! use cloudevents_sdk_reqwest::{RequestBuilderExt, ResponseExt}; -//! use cloudevents::{EventBuilderV10, EventBuilder}; -//! use serde_json::json; -//! -//! # async fn example() -> Result<(), Box> { -//! let client = reqwest::Client::new(); -//! -//! // Prepare the event to send -//! let event_to_send = EventBuilderV10::new() -//! .id("0001") -//! .ty("example.test") -//! .source("http://localhost/") -//! .data("application/json", json!({"hello": "world"})) -//! .build()?; -//! -//! // Send request -//! let response = client.post("http://localhost") -//! .event(event_to_send)? -//! .send().await?; -//! // Parse response as event -//! let received_event = response -//! .into_event().await?; -//! # Ok(()) -//! # } -//! ``` -//! -//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]. - -#![doc(html_root_url = "https://docs.rs/cloudevents-sdk-reqwest/0.3.1")] -#![deny(broken_intra_doc_links)] - -#[macro_use] -mod headers; -mod client_request; -mod client_response; - -pub use client_request::event_to_request; -pub use client_request::RequestBuilderExt; -pub use client_request::RequestSerializer; -pub use client_response::response_to_event; -pub use client_response::ResponseDeserializer; -pub use client_response::ResponseExt; diff --git a/cloudevents-sdk-reqwest/tests/version_number.rs b/cloudevents-sdk-reqwest/tests/version_number.rs deleted file mode 100644 index 288592d..0000000 --- a/cloudevents-sdk-reqwest/tests/version_number.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[test] -fn test_readme_deps() { - version_sync::assert_markdown_deps_updated!("README.md"); -} - -#[test] -fn test_html_root_url() { - version_sync::assert_html_root_url_updated!("src/lib.rs"); -} diff --git a/cloudevents-sdk-warp/Cargo.toml b/cloudevents-sdk-warp/Cargo.toml deleted file mode 100644 index b97e8f8..0000000 --- a/cloudevents-sdk-warp/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "cloudevents-sdk-warp" -version = "0.3.0" -authors = ["Marko Milenković "] -edition = "2018" -categories = ["web-programming", "encoding"] -license-file = "../LICENSE" - -[dependencies] -cloudevents-sdk = { path = ".." } -lazy_static = "1.4.0" -bytes = "^1.0" -warp = "^0.3" -http = "0.2" -hyper = "^0.14" - -[dev-dependencies] -tokio = { version = "^1.0", features = ["full"] } -url = { version = "^2.1" } -serde_json = "^1.0" -chrono = { version = "^0.4", features = ["serde"] } -mime = "0.3" -version-sync = "^0.9" \ No newline at end of file diff --git a/cloudevents-sdk-warp/README.md b/cloudevents-sdk-warp/README.md deleted file mode 100644 index 4fc82b9..0000000 --- a/cloudevents-sdk-warp/README.md +++ /dev/null @@ -1,110 +0,0 @@ -# CloudEvents SDK Rust - WARP [![Crates badge]][crates.io] [![Docs badge]][docs.rs] - -Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [Warp - Web Server Framework](https://github.com/seanmonstar/warp/). - -Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info. - -Using this crate you can extract CloudEvent from requests and write CloudEvents to responses. - -To echo events: - -```rust -use cloudevents_sdk_warp::{filter, reply}; -use warp::Filter; - -#[tokio::main] -async fn main() { - let routes = warp::any() - // extracting event from request - .and(filter::to_event()) - // returning event back - .map(|event| reply::from_event(event)); - - warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; -} -``` - -Executing `http` request: - -```bash -curl -v \ - -H "ce-specversion: 1.0" \ - -H "ce-id: 2" \ - -H "ce-type: example.event" \ - -H "ce-source: url://example_response/" \ - -H "content-type: application/json" \ - -X POST -d '{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }' \ - http://localhost:3030/ -``` - -Should produce response similar to: - -``` -* TCP_NODELAY set -* Connected to localhost (127.0.0.1) port 3030 (#0) -> POST / HTTP/1.1 -> Host: localhost:3030 -> User-Agent: curl/7.64.1 -> Accept: */* -> ce-specversion: 1.0 -> ce-id: 2 -> ce-type: example.event -> ce-source: url://example_response/ -> content-type: application/json -> Content-Length: 74 -> -* upload completely sent off: 74 out of 74 bytes -< HTTP/1.1 200 OK -< ce-specversion: 1.0 -< ce-id: 2 -< ce-type: example.event -< ce-source: url://example_response/ -< content-type: application/json -< content-length: 74 -< date: Mon, 02 Nov 2020 13:33:40 GMT -< -* Connection #0 to host localhost left intact -{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] } -``` - -To create event inside request handlers and send them as responses: - -```rust -#[tokio::main] -async fn main() { - let routes = warp::any().map(|| { - let event = EventBuilderV10::new() - .id("1") - .source(url::Url::parse("url://example_response/").unwrap()) - .ty("example.ce") - .data( - mime::APPLICATION_JSON.to_string(), - json!({ - "name": "John Doe", - "age": 43, - "phones": [ - "+44 1234567", - "+44 2345678" - ] - }), - ) - .build(); - - match event { - Ok(event) => Ok(reply::from_event(event)), - Err(e) => Ok(warp::reply::with_status( - e.to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response()), - } - }); - - warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; -} -``` - -[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-warp.svg -[crates.io]: https://crates.io/crates/cloudevents-sdk-warp -[Docs badge]: https://docs.rs/cloudevents-sdk-warp/badge.svg -[docs.rs]: https://docs.rs/cloudevents-sdk-warp \ No newline at end of file diff --git a/cloudevents-sdk-warp/src/filter.rs b/cloudevents-sdk-warp/src/filter.rs deleted file mode 100644 index ac163ec..0000000 --- a/cloudevents-sdk-warp/src/filter.rs +++ /dev/null @@ -1,134 +0,0 @@ -use crate::server_request::request_to_event; - -use cloudevents::Event; -use warp::http::HeaderMap; -use warp::Filter; -use warp::Rejection; - -#[derive(Debug)] -pub struct EventFilterError { - error: cloudevents::message::Error, -} - -impl warp::reject::Reject for EventFilterError {} - -/// -/// # Extracts [`cloudevents::Event`] from incoming request -/// -/// ``` -/// use cloudevents_sdk_warp::filter::to_event; -/// use warp::Filter; -/// use warp::Reply; -/// -/// let routes = warp::any() -/// .and(to_event()) -/// .map(|event| { -/// // do something with the event -/// } -/// ); -/// ``` -/// -pub fn to_event() -> impl Filter + Copy { - warp::header::headers_cloned() - .and(warp::body::bytes()) - .and_then(create_event) -} - -async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result { - request_to_event(headers, body) - .map_err(|error| warp::reject::custom(EventFilterError { error })) -} - -#[cfg(test)] -mod tests { - use super::to_event; - use warp::test; - - use chrono::Utc; - use cloudevents::{EventBuilder, EventBuilderV10}; - use serde_json::json; - - #[tokio::test] - async fn test_request() { - let time = Utc::now(); - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost/") - .time(time) - .extension("someint", "10") - .build() - .unwrap(); - - let result = test::request() - .method("POST") - .header("ce-specversion", "1.0") - .header("ce-id", "0001") - .header("ce-type", "example.test") - .header("ce-source", "http://localhost/") - .header("ce-someint", "10") - .header("ce-time", time.to_rfc3339()) - .filter(&to_event()) - .await - .unwrap(); - - assert_eq!(expected, result); - } - - #[tokio::test] - async fn test_bad_request() { - let time = Utc::now(); - - let result = test::request() - .method("POST") - .header("ce-specversion", "BAD SPECIFICATION") - .header("ce-id", "0001") - .header("ce-type", "example.test") - .header("ce-source", "http://localhost/") - .header("ce-someint", "10") - .header("ce-time", time.to_rfc3339()) - .filter(&to_event()) - .await; - - assert!(result.is_err()); - let rejection = result.unwrap_err(); - - let reason = rejection.find::().unwrap(); - assert_eq!( - reason.error.to_string(), - "Invalid specversion BAD SPECIFICATION" - ) - } - - #[tokio::test] - async fn test_request_with_full_data() { - let time = Utc::now(); - let j = json!({"hello": "world"}); - - let expected = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - .time(time) - .data("application/json", j.to_string().into_bytes()) - .extension("someint", "10") - .build() - .unwrap(); - - let result = test::request() - .method("POST") - .header("ce-specversion", "1.0") - .header("ce-id", "0001") - .header("ce-type", "example.test") - .header("ce-source", "http://localhost") - .header("ce-someint", "10") - .header("ce-time", time.to_rfc3339()) - .header("content-type", "application/json") - .json(&j) - .filter(&to_event()) - .await - .unwrap(); - - assert_eq!(expected, result); - } -} diff --git a/cloudevents-sdk-warp/src/headers.rs b/cloudevents-sdk-warp/src/headers.rs deleted file mode 100644 index 8fbd686..0000000 --- a/cloudevents-sdk-warp/src/headers.rs +++ /dev/null @@ -1,61 +0,0 @@ -use cloudevents::event::SpecVersion; -use http::header::HeaderName; -use lazy_static::lazy_static; -use warp::http::HeaderValue; - -use std::collections::HashMap; -use std::str::FromStr; - -macro_rules! unwrap_optional_header { - ($headers:expr, $name:expr) => { - $headers - .get::<&'static HeaderName>(&$name) - .map(|a| header_value_to_str!(a)) - }; -} - -macro_rules! header_value_to_str { - ($header_value:expr) => { - $header_value - .to_str() - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! str_name_to_header { - ($attribute:expr) => { - HeaderName::from_str($attribute).map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - str_name_to_header!(&["ce-", $attribute].concat()) - }; -} - -fn attributes_to_headers( - it: impl Iterator, -) -> HashMap<&'static str, HeaderName> { - it.map(|s| { - if s == "datacontenttype" { - (s, http::header::CONTENT_TYPE) - } else { - (s, attribute_name_to_header!(s).unwrap()) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = - attributes_to_headers(SpecVersion::all_attribute_names()); - pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = - HeaderName::from_static("ce-specversion"); - pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = - HeaderValue::from_static("application/cloudevents+json"); -} diff --git a/cloudevents-sdk-warp/src/lib.rs b/cloudevents-sdk-warp/src/lib.rs deleted file mode 100644 index 157b942..0000000 --- a/cloudevents-sdk-warp/src/lib.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/) -//! to easily send and receive CloudEvents. -//! -//! To deserialize an HTTP request as CloudEvent -//! -//! To echo events: -//! -//! ``` -//! use warp::{Filter, Reply}; -//! use cloudevents_sdk_warp::reply::from_event; -//! use cloudevents_sdk_warp::filter::to_event; -//! -//! let routes = warp::any() -//! // extracting event from request -//! .and(to_event()) -//! // returning event back -//! .map(|event| from_event(event)); -//! -//! warp::serve(routes).run(([127, 0, 0, 1], 3030)); -//! ``` -//! -//! To create event inside request handlers and send them as responses: -//! -//! ``` -//! use cloudevents::{Event, EventBuilder, EventBuilderV10}; -//! use http::StatusCode; -//! use serde_json::json; -//! use warp::{Filter, Reply}; -//! use cloudevents_sdk_warp::reply::from_event; -//! -//! let routes = warp::any().map(|| { -//! let event = EventBuilderV10::new() -//! .id("1") -//! .source("url://example_response/") -//! .ty("example.ce") -//! .data( -//! mime::APPLICATION_JSON.to_string(), -//! json!({ -//! "name": "John Doe", -//! "age": 43, -//! "phones": [ -//! "+44 1234567", -//! "+44 2345678" -//! ] -//! }), -//! ) -//! .build(); -//! -//! match event { -//! Ok(event) => Ok(from_event(event)), -//! Err(e) => Ok(warp::reply::with_status( -//! e.to_string(), -//! StatusCode::INTERNAL_SERVER_ERROR, -//! ) -//! .into_response()), -//! } -//! }); -//! warp::serve(routes).run(([127, 0, 0, 1], 3030)); -//! ``` -//! -//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`] - -#[macro_use] -mod headers; - -mod server_request; -mod server_response; - -pub mod filter; -pub mod reply; diff --git a/cloudevents-sdk-warp/src/reply.rs b/cloudevents-sdk-warp/src/reply.rs deleted file mode 100644 index dc5cfb1..0000000 --- a/cloudevents-sdk-warp/src/reply.rs +++ /dev/null @@ -1,126 +0,0 @@ -use crate::server_response::event_to_response; - -use cloudevents::Event; -use http::StatusCode; -use warp::reply::Response; - -/// -/// # Serializes [`cloudevents::Event`] as a http response -/// -/// ``` -/// use cloudevents_sdk_warp::reply::from_event; -/// use cloudevents::Event; -/// use warp::Filter; -/// use warp::Reply; -/// -/// let routes = warp::any() -/// .map(|| from_event(Event::default())); -/// ``` -pub fn from_event(event: Event) -> Response { - match event_to_response(event) { - Ok(response) => response, - Err(e) => warp::http::response::Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(hyper::body::Body::from(e.to_string())) - .unwrap(), - } -} - -#[cfg(test)] -mod tests { - - use cloudevents::{EventBuilder, EventBuilderV10}; - use serde_json::json; - - #[test] - fn test_response() { - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost/") - .extension("someint", "10") - .build() - .unwrap(); - - let resp = super::from_event(input); - - assert_eq!( - resp.headers() - .get("ce-specversion") - .unwrap() - .to_str() - .unwrap(), - "1.0" - ); - assert_eq!( - resp.headers().get("ce-id").unwrap().to_str().unwrap(), - "0001" - ); - assert_eq!( - resp.headers().get("ce-type").unwrap().to_str().unwrap(), - "example.test" - ); - assert_eq!( - resp.headers().get("ce-source").unwrap().to_str().unwrap(), - "http://localhost/" - ); - assert_eq!( - resp.headers().get("ce-someint").unwrap().to_str().unwrap(), - "10" - ); - } - - #[tokio::test] - async fn test_response_with_full_data() { - let j = json!({"hello": "world"}); - - let input = EventBuilderV10::new() - .id("0001") - .ty("example.test") - .source("http://localhost") - .data("application/json", j.clone()) - .extension("someint", "10") - .build() - .unwrap(); - - let resp = super::from_event(input); - - assert_eq!( - resp.headers() - .get("ce-specversion") - .unwrap() - .to_str() - .unwrap(), - "1.0" - ); - assert_eq!( - resp.headers().get("ce-id").unwrap().to_str().unwrap(), - "0001" - ); - assert_eq!( - resp.headers().get("ce-type").unwrap().to_str().unwrap(), - "example.test" - ); - assert_eq!( - resp.headers().get("ce-source").unwrap().to_str().unwrap(), - "http://localhost" - ); - assert_eq!( - resp.headers() - .get("content-type") - .unwrap() - .to_str() - .unwrap(), - "application/json" - ); - assert_eq!( - resp.headers().get("ce-someint").unwrap().to_str().unwrap(), - "10" - ); - - let (_, body) = resp.into_parts(); - let body = hyper::body::to_bytes(body).await.unwrap(); - - assert_eq!(j.to_string().as_bytes(), body); - } -} diff --git a/cloudevents-sdk-warp/src/server_request.rs b/cloudevents-sdk-warp/src/server_request.rs deleted file mode 100644 index b7cf246..0000000 --- a/cloudevents-sdk-warp/src/server_request.rs +++ /dev/null @@ -1,107 +0,0 @@ -use super::headers; -use bytes::Bytes; -use http::{header::HeaderName, HeaderMap}; - -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, - MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, -}; - -use cloudevents::{message, Event}; -use std::convert::TryFrom; - -pub struct RequestDeserializer { - headers: HeaderMap, - body: Bytes, -} - -impl RequestDeserializer { - pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer { - RequestDeserializer { headers, body } - } -} - -impl BinaryDeserializer for RequestDeserializer { - fn deserialize_binary>(self, mut visitor: V) -> Result { - if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}); - } - - let spec_version = SpecVersion::try_from( - unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, - )?; - - visitor = visitor.set_spec_version(spec_version.clone())?; - - let attributes = spec_version.attribute_names(); - - for (hn, hv) in self - .headers - .iter() - .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) - { - let name = &hn.as_str()["ce-".len()..]; - - if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - } - - if let Some(hv) = self.headers.get("content-type") { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), - )? - } - - if self.body.len() != 0 { - visitor.end_with_data(self.body.to_vec()) - } else { - visitor.end() - } - } -} - -impl StructuredDeserializer for RequestDeserializer { - fn deserialize_structured>(self, visitor: V) -> Result { - if self.encoding() != Encoding::STRUCTURED { - return Err(message::Error::WrongEncoding {}); - } - visitor.set_structured_event(self.body.to_vec()) - } -} - -impl MessageDeserializer for RequestDeserializer { - fn encoding(&self) -> Encoding { - if self - .headers - .get("content-type") - .map(|v| v.to_str().unwrap_or("")) - .unwrap_or("") - == "application/cloudevents+json" - { - Encoding::STRUCTURED - } else if self - .headers - .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) - .is_some() - { - Encoding::BINARY - } else { - Encoding::UNKNOWN - } - } -} - -pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result { - MessageDeserializer::into_event(RequestDeserializer::new(req, bytes)) -} diff --git a/cloudevents-sdk-warp/src/server_response.rs b/cloudevents-sdk-warp/src/server_response.rs deleted file mode 100644 index c5a464c..0000000 --- a/cloudevents-sdk-warp/src/server_response.rs +++ /dev/null @@ -1,102 +0,0 @@ -use super::headers; - -use cloudevents::event::SpecVersion; -use cloudevents::message::{ - BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, - StructuredSerializer, -}; -use cloudevents::Event; - -use warp::http::HeaderValue; -use warp::hyper::Body; -use warp::reply::Response; - -use http::header::HeaderName; -use http::response::Builder; - -use std::{convert::TryFrom, str::FromStr}; - -pub struct ResponseSerializer { - builder: Builder, -} - -impl ResponseSerializer { - fn new() -> Self { - ResponseSerializer { - builder: http::Response::builder(), - } - } -} - -impl BinarySerializer for ResponseSerializer { - fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.builder = self.builder.header( - headers::SPEC_VERSION_HEADER.clone(), - HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?, - ); - Ok(self) - } - - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.builder = self.builder.header( - headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), - HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?, - ); - Ok(self) - } - - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.builder = self.builder.header( - attribute_name_to_header!(name)?, - HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?, - ); - Ok(self) - } - - fn end_with_data(self, bytes: Vec) -> Result { - self.builder - .body(Body::from(bytes)) - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - } - - fn end(self) -> Result { - self.builder - .body(Body::empty()) - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - }) - } -} - -impl StructuredSerializer for ResponseSerializer { - fn set_structured_event(self, bytes: Vec) -> Result { - Ok(self - .builder - .header( - http::header::CONTENT_TYPE, - headers::CLOUDEVENTS_JSON_HEADER.clone(), - ) - .body(Body::from(bytes)) - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - })?) - } -} - -pub fn event_to_response(event: Event) -> std::result::Result { - BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new()) -} diff --git a/cloudevents-sdk-warp/tests/version_number.rs b/cloudevents-sdk-warp/tests/version_number.rs deleted file mode 100644 index 288592d..0000000 --- a/cloudevents-sdk-warp/tests/version_number.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[test] -fn test_readme_deps() { - version_sync::assert_markdown_deps_updated!("README.md"); -} - -#[test] -fn test_html_root_url() { - version_sync::assert_html_root_url_updated!("src/lib.rs"); -} diff --git a/example-projects/actix-web-example/Cargo.toml b/example-projects/actix-web-example/Cargo.toml index f8ef30c..3e85b0c 100644 --- a/example-projects/actix-web-example/Cargo.toml +++ b/example-projects/actix-web-example/Cargo.toml @@ -14,5 +14,3 @@ futures = "^0.3" serde_json = "^1.0" url = { version = "^2.1" } env_logger = "0.7.1" - -[workspace] \ No newline at end of file diff --git a/example-projects/rdkafka-example/Cargo.toml b/example-projects/rdkafka-example/Cargo.toml index 30ccee4..598ef2e 100644 --- a/example-projects/rdkafka-example/Cargo.toml +++ b/example-projects/rdkafka-example/Cargo.toml @@ -17,5 +17,3 @@ futures = "^0.3" tokio = { version = "^1.0", features = ["full"] } clap = "2.33.1" rdkafka = { version = "^0.25", features = ["cmake-build"] } - -[workspace] diff --git a/example-projects/reqwest-wasm-example/Cargo.toml b/example-projects/reqwest-wasm-example/Cargo.toml index 85dd1bf..8d2e894 100644 --- a/example-projects/reqwest-wasm-example/Cargo.toml +++ b/example-projects/reqwest-wasm-example/Cargo.toml @@ -16,5 +16,3 @@ url = { version = "^2.1" } web-sys = { version = "0.3.39", features = ["Window", "Location"] } wasm-bindgen-futures = "0.4.12" wasm-bindgen = { version = "0.2.62", features = ["serde-serialize"] } - -[workspace] diff --git a/example-projects/warp-example/Cargo.toml b/example-projects/warp-example/Cargo.toml index 131ce92..0388e38 100644 --- a/example-projects/warp-example/Cargo.toml +++ b/example-projects/warp-example/Cargo.toml @@ -10,6 +10,3 @@ license-file = "../LICENSE" cloudevents-sdk = { path = "../..", features = ["cloudevents-warp"] } warp = "^0.3" tokio = { version = "^1.0", features = ["full"] } - -[workspace] -