De-workspace-ification (#135)

Now that we've refactored the protocol bindings from crates to
feature-guarded modules (PR #131), we can remove the workspaces for
those crates.

Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
Jim Crossley 2021-06-30 08:13:41 -04:00 committed by GitHub
parent 12230429b8
commit 432259bd26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 4 additions and 2622 deletions

View File

@ -60,14 +60,14 @@ jobs:
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --workspace --all-features
args: --target ${{ matrix.target }} --all-features
- uses: actions-rs/cargo@v1
name: "Test"
if: matrix.target == 'x86_64-unknown-linux-gnu'
with:
command: test
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --workspace --all-features
args: --target ${{ matrix.target }} --all-features
# If musl, compile and test all
- uses: actions-rs/cargo@v1
@ -76,7 +76,7 @@ jobs:
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --workspace --all-features
args: --target ${{ matrix.target }} --all-features
env:
CC: musl-gcc
CXX: g++
@ -86,7 +86,7 @@ jobs:
with:
command: test
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --workspace --all-features
args: --target ${{ matrix.target }} --all-features
env:
CC: musl-gcc
CXX: g++

View File

@ -66,18 +66,3 @@ chrono = { version = "^0.4", features = ["serde"] }
mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
mime = "0.3"
[workspace]
members = [
".",
"cloudevents-sdk-actix-web",
"cloudevents-sdk-reqwest",
"cloudevents-sdk-rdkafka",
"cloudevents-sdk-warp",
]
exclude = [
"example-projects/actix-web-example",
"example-projects/reqwest-wasm-example",
"example-projects/rdkafka-example",
"example-projects/warp-example",
]

View File

@ -1,28 +0,0 @@
[package]
name = "cloudevents-sdk-actix-web"
version = "0.3.1"
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
license-file = "../LICENSE"
edition = "2018"
description = "CloudEvents official Rust SDK - Actix-Web integration"
documentation = "https://docs.rs/cloudevents-sdk-actix-web"
repository = "https://github.com/cloudevents/sdk-rust"
readme = "README.md"
categories = ["web-programming", "encoding", "web-programming::http-server"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
cloudevents-sdk = { version = "0.3.0", path = ".." }
actix-web = { version = "^3", default-features = false }
async-trait = "^0.1.33"
lazy_static = "1.4.0"
bytes = "^1.0"
futures = "^0.3"
[dev-dependencies]
actix-rt = "^1"
url = { version = "^2.1", features = ["serde"] }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
version-sync = "^0.9"

View File

@ -1,39 +0,0 @@
# CloudEvents SDK Rust - Actix-Web [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [Actix Web](https://github.com/actix/actix-web).
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
## Development & Contributing
If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)
## Community
- There are bi-weekly calls immediately following the
[Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time)
at 9am PT (US Pacific). Which means they will typically start at 10am PT, but
if the other call ends early then the SDK call will start early as well. See
the
[CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#)
to determine which week will have the call.
- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under
[CNCF's Slack workspace](https://slack.cncf.io/).
- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk
- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack).
Each SDK may have its own unique processes, tooling and guidelines, common
governance related material can be found in the
[CloudEvents `community`](https://github.com/cloudevents/spec/tree/master/community)
directory. In particular, in there you will find information concerning
how SDK projects are
[managed](https://github.com/cloudevents/spec/blob/master/community/SDK-GOVERNANCE.md),
[guidelines](https://github.com/cloudevents/spec/blob/master/community/SDK-maintainer-guidelines.md)
for how PR reviews and approval, and our
[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information)
information.
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-actix-web.svg
[crates.io]: https://crates.io/crates/cloudevents-sdk-actix-web
[Docs badge]: https://docs.rs/cloudevents-sdk-actix-web/badge.svg
[docs.rs]: https://docs.rs/cloudevents-sdk-actix-web

View File

@ -1,68 +0,0 @@
use actix_web::http::header;
use actix_web::http::{HeaderName, HeaderValue};
use cloudevents::event::SpecVersion;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::str::FromStr;
macro_rules! unwrap_optional_header {
($headers:expr, $name:expr) => {
$headers
.get::<&'static HeaderName>(&$name)
.map(|a| header_value_to_str!(a))
};
}
macro_rules! header_value_to_str {
($header_value:expr) => {
$header_value
.to_str()
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_to_header_value {
($header_value:expr) => {
HeaderValue::from_str($header_value).map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_name_to_header {
($attribute:expr) => {
HeaderName::from_str($attribute).map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! attribute_name_to_header {
($attribute:expr) => {
str_name_to_header!(&["ce-", $attribute].concat())
};
}
fn attributes_to_headers(
it: impl Iterator<Item = &'static str>,
) -> HashMap<&'static str, HeaderName> {
it.map(|s| {
if s == "datacontenttype" {
(s, header::CONTENT_TYPE)
} else {
(s, attribute_name_to_header!(s).unwrap())
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
attributes_to_headers(SpecVersion::all_attribute_names());
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
HeaderName::from_static("ce-specversion");
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
HeaderValue::from_static("application/cloudevents+json");
}

View File

@ -1,57 +0,0 @@
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Actix web](https://docs.rs/actix-web/) to easily send and receive CloudEvents.
//!
//! To deserialize an HTTP request as CloudEvent:
//!
//! ```
//! use cloudevents_sdk_actix_web::HttpRequestExt;
//! use actix_web::{HttpRequest, web, post};
//!
//! #[post("/")]
//! async fn post_event(req: HttpRequest, payload: web::Payload) -> Result<String, actix_web::Error> {
//! let event = req.to_event(payload).await?;
//! println!("Received Event: {:?}", event);
//! Ok(format!("{:?}", event))
//! }
//! ```
//!
//! To serialize a CloudEvent to an HTTP response:
//!
//! ```
//! use cloudevents_sdk_actix_web::HttpResponseBuilderExt;
//! use actix_web::{HttpRequest, web, get, HttpResponse};
//! use cloudevents::{EventBuilderV10, EventBuilder};
//! use serde_json::json;
//!
//! #[get("/")]
//! async fn get_event() -> Result<HttpResponse, actix_web::Error> {
//! Ok(HttpResponse::Ok()
//! .event(
//! EventBuilderV10::new()
//! .id("0001")
//! .ty("example.test")
//! .source("http://localhost/")
//! .data("application/json", json!({"hello": "world"}))
//! .build()
//! .expect("No error while building the event"),
//! )
//! .await?
//! )
//! }
//! ```
//!
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk-actix-web/0.3.1")]
#![deny(broken_intra_doc_links)]
#[macro_use]
mod headers;
mod server_request;
mod server_response;
pub use server_request::request_to_event;
pub use server_request::HttpRequestDeserializer;
pub use server_request::HttpRequestExt;
pub use server_response::event_to_response;
pub use server_response::HttpResponseBuilderExt;
pub use server_response::HttpResponseSerializer;

View File

@ -1,210 +0,0 @@
use super::headers;
use actix_web::http::HeaderName;
use actix_web::web::{Bytes, BytesMut};
use actix_web::{web, HttpMessage, HttpRequest};
use async_trait::async_trait;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use futures::StreamExt;
use std::convert::TryFrom;
/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait.
pub struct HttpRequestDeserializer<'a> {
req: &'a HttpRequest,
body: Bytes,
}
impl HttpRequestDeserializer<'_> {
pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer {
HttpRequestDeserializer { req, body }
}
}
impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
for (hn, hv) in
self.req.headers().iter().filter(|(hn, _)| {
headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
})
{
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.req.headers().get("content-type") {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if !self.body.is_empty() {
visitor.end_with_data(self.body.to_vec())
} else {
visitor.end()
}
}
}
impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body.to_vec())
}
}
impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
fn encoding(&self) -> Encoding {
if self.req.content_type() == "application/cloudevents+json" {
Encoding::STRUCTURED
} else if self
.req
.headers()
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
.is_some()
{
Encoding::BINARY
} else {
Encoding::UNKNOWN
}
}
}
/// Method to transform an incoming [`HttpRequest`] to [`Event`].
pub async fn request_to_event(
req: &HttpRequest,
mut payload: web::Payload,
) -> std::result::Result<Event, actix_web::error::Error> {
let mut bytes = BytesMut::new();
while let Some(item) = payload.next().await {
bytes.extend_from_slice(&item?);
}
MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze()))
.map_err(actix_web::error::ErrorBadRequest)
}
/// Extention Trait for [`HttpRequest`] which acts as a wrapper for the function [`request_to_event()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
#[async_trait(?Send)]
pub trait HttpRequestExt: private::Sealed {
/// Convert this [`HttpRequest`] into an [`Event`].
async fn to_event(
&self,
mut payload: web::Payload,
) -> std::result::Result<Event, actix_web::error::Error>;
}
#[async_trait(?Send)]
impl HttpRequestExt for HttpRequest {
async fn to_event(
&self,
payload: web::Payload,
) -> std::result::Result<Event, actix_web::error::Error> {
request_to_event(self, payload).await
}
}
mod private {
// Sealing the RequestExt
pub trait Sealed {}
impl Sealed for actix_web::HttpRequest {}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::test;
use chrono::Utc;
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[actix_rt::test]
async fn test_request() {
let time = Utc::now();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.extension("someint", "10")
.build()
.unwrap();
let (req, payload) = test::TestRequest::post()
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.to_http_parts();
let resp = req.to_event(web::Payload(payload)).await.unwrap();
assert_eq!(expected, resp);
}
#[actix_rt::test]
async fn test_request_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let (req, payload) = test::TestRequest::post()
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.set_json(&j)
.to_http_parts();
let resp = req.to_event(web::Payload(payload)).await.unwrap();
assert_eq!(expected, resp);
}
}

View File

@ -1,214 +0,0 @@
use super::headers;
use actix_web::dev::HttpResponseBuilder;
use actix_web::http::{HeaderName, HeaderValue};
use actix_web::HttpResponse;
use async_trait::async_trait;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use cloudevents::Event;
use std::str::FromStr;
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
pub struct HttpResponseSerializer {
builder: HttpResponseBuilder,
}
impl HttpResponseSerializer {
pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer {
HttpResponseSerializer { builder }
}
}
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.builder.set_header(
headers::SPEC_VERSION_HEADER.clone(),
str_to_header_value!(spec_version.as_str())?,
);
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder.set_header(
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
str_to_header_value!(value.to_string().as_str())?,
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder.set_header(
attribute_name_to_header!(name)?,
str_to_header_value!(value.to_string().as_str())?,
);
Ok(self)
}
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
Ok(self.builder.body(bytes))
}
fn end(mut self) -> Result<HttpResponse> {
Ok(self.builder.finish())
}
}
impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
Ok(self
.builder
.set_header(
actix_web::http::header::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER.clone(),
)
.body(bytes))
}
}
/// Method to fill an [`HttpResponseBuilder`] with an [`Event`].
pub async fn event_to_response(
event: Event,
response: HttpResponseBuilder,
) -> std::result::Result<HttpResponse, actix_web::error::Error> {
BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response))
.map_err(actix_web::error::ErrorBadRequest)
}
/// Extension Trait for [`HttpResponseBuilder`] which acts as a wrapper for the function [`event_to_response()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
#[async_trait(?Send)]
pub trait HttpResponseBuilderExt: private::Sealed {
/// Fill this [`HttpResponseBuilder`] with an [`Event`].
async fn event(
self,
event: Event,
) -> std::result::Result<HttpResponse, actix_web::error::Error>;
}
#[async_trait(?Send)]
impl HttpResponseBuilderExt for HttpResponseBuilder {
async fn event(
self,
event: Event,
) -> std::result::Result<HttpResponse, actix_web::error::Error> {
event_to_response(event, self).await
}
}
// Sealing the HttpResponseBuilderExt
mod private {
pub trait Sealed {}
impl Sealed for actix_web::dev::HttpResponseBuilder {}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::http::StatusCode;
use actix_web::test;
use cloudevents::{EventBuilder, EventBuilderV10};
use futures::TryStreamExt;
use serde_json::json;
#[actix_rt::test]
async fn test_response() {
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let resp = HttpResponseBuilder::new(StatusCode::OK)
.event(input)
.await
.unwrap();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
}
#[actix_rt::test]
async fn test_response_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let mut resp = HttpResponseBuilder::new(StatusCode::OK)
.event(input)
.await
.unwrap();
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let bytes = test::load_stream(resp.take_body().into_stream())
.await
.unwrap();
assert_eq!(j.to_string().as_bytes(), bytes.as_ref())
}
}

View File

@ -1,9 +0,0 @@
#[test]
fn test_readme_deps() {
version_sync::assert_markdown_deps_updated!("README.md");
}
#[test]
fn test_html_root_url() {
version_sync::assert_html_root_url_updated!("src/lib.rs");
}

View File

@ -1,26 +0,0 @@
[package]
name = "cloudevents-sdk-rdkafka"
version = "0.3.1"
authors = ["Pranav Bhatt <adpranavb2000@gmail.com>"]
edition = "2018"
license-file = "../LICENSE"
description = "CloudEvents official Rust SDK - Kafka integration"
documentation = "https://docs.rs/cloudevents-sdk-rdkafka"
repository = "https://github.com/cloudevents/sdk-rust"
readme = "README.md"
categories = ["web-programming", "encoding"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "^1.0"
cloudevents-sdk = { version = "0.3.0", path = ".." }
lazy_static = "1.4.0"
rdkafka = { version = "^0.25", features = ["cmake-build"] }
[dev-dependencies]
url = { version = "^2.1" }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
futures = "^0.3"
version-sync = "^0.9"

View File

@ -1,39 +0,0 @@
# CloudEvents SDK Rust - rdkafka [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [rdkafka](https://github.com/fede1024/rust-rdkafka)
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
## Development & Contributing
If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)
## Community
- There are bi-weekly calls immediately following the
[Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time)
at 9am PT (US Pacific). Which means they will typically start at 10am PT, but
if the other call ends early then the SDK call will start early as well. See
the
[CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#)
to determine which week will have the call.
- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under
[CNCF's Slack workspace](https://slack.cncf.io/).
- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk
- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack).
Each SDK may have its own unique processes, tooling and guidelines, common
governance related material can be found in the
[CloudEvents `community`](https://github.com/cloudevents/spec/tree/master/community)
directory. In particular, in there you will find information concerning
how SDK projects are
[managed](https://github.com/cloudevents/spec/blob/master/community/SDK-GOVERNANCE.md),
[guidelines](https://github.com/cloudevents/spec/blob/master/community/SDK-maintainer-guidelines.md)
for how PR reviews and approval, and our
[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information)
information.
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-rdkafka.svg
[crates.io]: https://crates.io/crates/cloudevents-sdk-rdkafka
[Docs badge]: https://docs.rs/cloudevents-sdk-rdkafka/badge.svg
[docs.rs]: https://docs.rs/cloudevents-sdk-rdkafka

View File

@ -1,29 +0,0 @@
use cloudevents::event::SpecVersion;
use lazy_static::lazy_static;
use std::collections::HashMap;
macro_rules! attribute_name_to_header {
($attribute:expr) => {
format!("ce_{}", $attribute)
};
}
fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
it.map(|s| {
if s == "datacontenttype" {
(s, String::from("content-type"))
} else {
(s, attribute_name_to_header!(s))
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> =
attributes_to_headers(SpecVersion::all_attribute_names());
}
pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion";
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
pub(crate) static CONTENT_TYPE: &str = "content-type";

View File

@ -1,258 +0,0 @@
use crate::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::str;
/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait.
pub struct ConsumerRecordDeserializer {
pub(crate) headers: HashMap<String, Vec<u8>>,
pub(crate) payload: Option<Vec<u8>>,
}
impl ConsumerRecordDeserializer {
fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
let mut hm = HashMap::new();
let headers = message
.headers()
// TODO create an error variant for invalid headers
.ok_or(cloudevents::message::Error::WrongEncoding {})?;
for i in 0..headers.count() {
let header = headers.get(i).unwrap();
hm.insert(header.0.to_string(), Vec::from(header.1));
}
Ok(hm)
}
pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
Ok(ConsumerRecordDeserializer {
headers: Self::get_kafka_headers(message)?,
payload: message.payload().map(Vec::from),
})
}
}
impl BinaryDeserializer for ConsumerRecordDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
}
for (hn, hv) in self
.headers
.into_iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
{
let name = &hn["ce_".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
}
}
if self.payload != None {
visitor.end_with_data(self.payload.unwrap())
} else {
visitor.end()
}
}
}
impl StructuredDeserializer for ConsumerRecordDeserializer {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.payload.unwrap())
}
}
impl MessageDeserializer for ConsumerRecordDeserializer {
fn encoding(&self) -> Encoding {
match (
self.headers
.get("content-type")
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
.unwrap_or(false),
self.headers.get(headers::SPEC_VERSION_HEADER),
) {
(true, _) => Encoding::STRUCTURED,
(_, Some(_)) => Encoding::BINARY,
_ => Encoding::UNKNOWN,
}
}
}
/// Method to transform a [`Message`] to [`Event`].
pub fn record_to_event(msg: &impl Message) -> Result<Event> {
MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
}
/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait MessageExt: private::Sealed {
/// Generates [`Event`] from [`BorrowedMessage`].
fn to_event(&self) -> Result<Event>;
}
impl MessageExt for BorrowedMessage<'_> {
fn to_event(&self) -> Result<Event> {
record_to_event(self)
}
}
impl MessageExt for OwnedMessage {
fn to_event(&self) -> Result<Event> {
record_to_event(self)
}
}
mod private {
// Sealing the MessageExt
pub trait Sealed {}
impl Sealed for rdkafka::message::OwnedMessage {}
impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kafka_producer_record::MessageRecord;
use chrono::Utc;
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[test]
fn test_binary_record() {
let time = Utc::now();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.build()
.unwrap();
// Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
// OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
// the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
// in the case of BorrowedMessage
let message_record = MessageRecord::from_event(
EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.build()
.unwrap(),
)
.unwrap();
let owned_message = OwnedMessage::new(
message_record.payload,
Some(String::from("test key").into_bytes()),
String::from("test topic"),
rdkafka::message::Timestamp::NotAvailable,
10,
10,
Some(message_record.headers),
);
assert_eq!(owned_message.to_event().unwrap(), expected)
}
#[test]
fn test_structured_record() {
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
// Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
// OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
// the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
// in the case of BorrowedMessage
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let serialized_event =
StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
let owned_message = OwnedMessage::new(
serialized_event.payload,
Some(String::from("test key").into_bytes()),
String::from("test topic"),
rdkafka::message::Timestamp::NotAvailable,
10,
10,
Some(serialized_event.headers),
);
assert_eq!(owned_message.to_event().unwrap(), expected)
}
}

View File

@ -1,153 +0,0 @@
use super::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use cloudevents::Event;
use rdkafka::message::{OwnedHeaders, ToBytes};
use rdkafka::producer::{BaseRecord, FutureRecord};
/// This struct contains a serialized CloudEvent message in the Kafka shape.
/// Implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
///
/// To instantiate a new `MessageRecord` from an [`Event`],
/// look at [`Self::from_event`] or use [`StructuredDeserializer::deserialize_structured`](cloudevents::message::StructuredDeserializer::deserialize_structured)
/// or [`BinaryDeserializer::deserialize_binary`].
pub struct MessageRecord {
pub(crate) headers: OwnedHeaders,
pub(crate) payload: Option<Vec<u8>>,
}
impl MessageRecord {
/// Create a new empty [`MessageRecord`]
pub fn new() -> Self {
MessageRecord {
headers: OwnedHeaders::new(),
payload: None,
}
}
/// Create a new [`MessageRecord`], filled with `event` serialized in binary mode.
pub fn from_event(event: Event) -> Result<Self> {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
}
}
impl Default for MessageRecord {
fn default() -> Self {
Self::new()
}
}
impl BinarySerializer<MessageRecord> for MessageRecord {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.headers = self
.headers
.add(headers::SPEC_VERSION_HEADER, spec_version.as_str());
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.headers = self.headers.add(
&headers::ATTRIBUTES_TO_HEADERS
.get(name)
.ok_or(cloudevents::message::Error::UnknownAttribute {
name: String::from(name),
})?
.clone()[..],
&value.to_string()[..],
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.headers = self
.headers
.add(&attribute_name_to_header!(name)[..], &value.to_string()[..]);
Ok(self)
}
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
self.payload = Some(bytes);
Ok(self)
}
fn end(self) -> Result<MessageRecord> {
Ok(self)
}
}
impl StructuredSerializer<MessageRecord> for MessageRecord {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
self.headers = self
.headers
.add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER);
self.payload = Some(bytes);
Ok(self)
}
}
/// Extension Trait for [`BaseRecord`] that fills the record with a [`MessageRecord`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
/// Fill this [`BaseRecord`] with a [`MessageRecord`].
fn message_record(
self,
message_record: &'a MessageRecord,
) -> Result<BaseRecord<'a, K, Vec<u8>>>;
}
impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>> {
fn message_record(
mut self,
message_record: &'a MessageRecord,
) -> Result<BaseRecord<'a, K, Vec<u8>>> {
self = self.headers(message_record.headers.clone());
if let Some(s) = message_record.payload.as_ref() {
self = self.payload(s);
}
Ok(self)
}
}
/// Extension Trait for [`FutureRecord`] that fills the record with a [`MessageRecord`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
/// Fill this [`FutureRecord`] with a [`MessageRecord`].
fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
}
impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec<u8>> {
fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>> {
self = self.headers(message_record.headers.clone());
if let Some(s) = message_record.payload.as_ref() {
self = self.payload(s);
}
self
}
}
mod private {
// Sealing the FutureRecordExt and BaseRecordExt
pub trait Sealed {}
impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
for rdkafka::producer::FutureRecord<'_, K, V>
{
}
impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
for rdkafka::producer::BaseRecord<'_, K, V>
{
}
}

View File

@ -1,65 +0,0 @@
//! This library provides Kafka protocol bindings for CloudEvents
//! using the [rust-rdkafka](https://fede1024.github.io/rust-rdkafka) library.
//!
//! To produce Cloudevents:
//!
//! ```
//!
//! use cloudevents::Event;
//! use rdkafka::producer::{FutureProducer, FutureRecord};
//! use rdkafka::util::Timeout;
//! use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt};
//!
//! # async fn produce(producer: &FutureProducer, event: Event) -> Result<(), Box<dyn std::error::Error>> {
//! let message_record = MessageRecord::from_event(event)?;
//!
//! producer.send(
//! FutureRecord::to("topic")
//! .key("some_event")
//! .message_record(&message_record),
//! Timeout::Never
//! ).await;
//! # Ok(())
//! # }
//!
//! ```
//!
//! To consume Cloudevents:
//!
//! ```
//! use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode};
//! use cloudevents_sdk_rdkafka::MessageExt;
//! use futures::StreamExt;
//!
//! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) -> Result<(), Box<dyn std::error::Error>> {
//! let mut message_stream = consumer.start();
//!
//! while let Some(message) = message_stream.next().await {
//! match message {
//! Err(e) => println!("Kafka error: {}", e),
//! Ok(m) => {
//! let event = m.to_event()?;
//! println!("Received Event: {}", event);
//! consumer.commit_message(&m, CommitMode::Async)?;
//! }
//! };
//! }
//! # Ok(())
//! # }
//! ```
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk-rdkafka/0.3.1")]
#![deny(broken_intra_doc_links)]
#[macro_use]
mod headers;
mod kafka_consumer_record;
mod kafka_producer_record;
pub use kafka_consumer_record::record_to_event;
pub use kafka_consumer_record::ConsumerRecordDeserializer;
pub use kafka_consumer_record::MessageExt;
pub use kafka_producer_record::BaseRecordExt;
pub use kafka_producer_record::FutureRecordExt;
pub use kafka_producer_record::MessageRecord;

View File

@ -1,9 +0,0 @@
#[test]
fn test_readme_deps() {
version_sync::assert_markdown_deps_updated!("README.md");
}
#[test]
fn test_html_root_url() {
version_sync::assert_html_root_url_updated!("src/lib.rs");
}

View File

@ -1,32 +0,0 @@
[package]
name = "cloudevents-sdk-reqwest"
version = "0.3.1"
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
license-file = "../LICENSE"
edition = "2018"
description = "CloudEvents official Rust SDK - Reqwest integration"
documentation = "https://docs.rs/cloudevents-sdk-reqwest"
repository = "https://github.com/cloudevents/sdk-rust"
readme = "README.md"
categories = ["web-programming", "encoding", "web-programming::http-client"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "^0.1.33"
cloudevents-sdk = { version = "0.3.0", path = ".." }
lazy_static = "1.4.0"
bytes = "^1.0"
[dependencies.reqwest]
version = "^0.11"
default-features = false
features = ["rustls-tls"]
[dev-dependencies]
mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
url = { version = "^2.1" }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
version-sync = "^0.9"

View File

@ -1,39 +0,0 @@
# CloudEvents SDK Rust - Reqwest [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [reqwest](https://github.com/seanmonstar/reqwest)
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
## Development & Contributing
If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)
## Community
- There are bi-weekly calls immediately following the
[Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time)
at 9am PT (US Pacific). Which means they will typically start at 10am PT, but
if the other call ends early then the SDK call will start early as well. See
the
[CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#)
to determine which week will have the call.
- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under
[CNCF's Slack workspace](https://slack.cncf.io/).
- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk
- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack).
Each SDK may have its own unique processes, tooling and guidelines, common
governance related material can be found in the
[CloudEvents `community`](https://github.com/cloudevents/spec/tree/master/community)
directory. In particular, in there you will find information concerning
how SDK projects are
[managed](https://github.com/cloudevents/spec/blob/master/community/SDK-GOVERNANCE.md),
[guidelines](https://github.com/cloudevents/spec/blob/master/community/SDK-maintainer-guidelines.md)
for how PR reviews and approval, and our
[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information)
information.
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-reqwest.svg
[crates.io]: https://crates.io/crates/cloudevents-sdk-reqwest
[Docs badge]: https://docs.rs/cloudevents-sdk-reqwest/badge.svg
[docs.rs]: https://docs.rs/cloudevents-sdk-reqwest

View File

@ -1,199 +0,0 @@
use super::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use cloudevents::Event;
use reqwest::RequestBuilder;
use std::str::FromStr;
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
pub struct RequestSerializer {
req: RequestBuilder,
}
impl RequestSerializer {
pub fn new(req: RequestBuilder) -> RequestSerializer {
RequestSerializer { req }
}
}
impl BinarySerializer<RequestBuilder> for RequestSerializer {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.req = self
.req
.header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str());
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.req = self.req.header(
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
value.to_string(),
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.req = self
.req
.header(attribute_name_to_header!(name)?, value.to_string());
Ok(self)
}
fn end_with_data(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
Ok(self.req.body(bytes))
}
fn end(self) -> Result<RequestBuilder> {
Ok(self.req)
}
}
impl StructuredSerializer<RequestBuilder> for RequestSerializer {
fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
Ok(self
.req
.header(
reqwest::header::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER.clone(),
)
.body(bytes))
}
}
/// Method to fill a [`RequestBuilder`] with an [`Event`].
pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result<RequestBuilder> {
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
}
/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait RequestBuilderExt: private::Sealed {
/// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
fn event(self, event: Event) -> Result<RequestBuilder>;
}
impl RequestBuilderExt for RequestBuilder {
fn event(self, event: Event) -> Result<RequestBuilder> {
event_to_request(event, self)
}
}
// Sealing the RequestBuilderExt
mod private {
pub trait Sealed {}
impl Sealed for reqwest::RequestBuilder {}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::{mock, Matcher};
use cloudevents::message::StructuredDeserializer;
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[tokio::test]
async fn test_request() {
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("ce-specversion", "1.0")
.match_header("ce-id", "0001")
.match_header("ce-type", "example.test")
.match_header("ce-source", "http://localhost/")
.match_header("ce-someint", "10")
.match_body(Matcher::Missing)
.create();
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
client
.post(&url)
.event(input)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
#[tokio::test]
async fn test_request_with_full_data() {
let j = json!({"hello": "world"});
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("ce-specversion", "1.0")
.match_header("ce-id", "0001")
.match_header("ce-type", "example.test")
.match_header("ce-source", "http://localhost/")
.match_header("content-type", "application/json")
.match_header("ce-someint", "10")
.match_body(Matcher::Exact(j.to_string()))
.create();
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
client
.post(&url)
.event(input)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
#[tokio::test]
async fn test_structured_request_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("content-type", "application/cloudevents+json")
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
.create();
let client = reqwest::Client::new();
StructuredDeserializer::deserialize_structured(
input,
RequestSerializer::new(client.post(&url)),
)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
}

View File

@ -1,263 +0,0 @@
use super::headers;
use async_trait::async_trait;
use bytes::Bytes;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use reqwest::header::{HeaderMap, HeaderName};
use reqwest::Response;
use std::convert::TryFrom;
/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait.
pub struct ResponseDeserializer {
headers: HeaderMap,
body: Bytes,
}
impl ResponseDeserializer {
pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer {
ResponseDeserializer { headers, body }
}
}
impl BinaryDeserializer for ResponseDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
for (hn, hv) in self
.headers
.iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
{
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.headers.get("content-type") {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if !self.body.is_empty() {
visitor.end_with_data(self.body.to_vec())
} else {
visitor.end()
}
}
}
impl StructuredDeserializer for ResponseDeserializer {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body.to_vec())
}
}
impl MessageDeserializer for ResponseDeserializer {
fn encoding(&self) -> Encoding {
match (
#[allow(clippy::borrow_interior_mutable_const)]
unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE)
.map(|r| r.ok())
.flatten()
.map(|e| e.starts_with("application/cloudevents+json")),
self.headers
.get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER),
) {
(Some(true), _) => Encoding::STRUCTURED,
(_, Some(_)) => Encoding::BINARY,
_ => Encoding::UNKNOWN,
}
}
}
/// Method to transform an incoming [`Response`] to [`Event`].
pub async fn response_to_event(res: Response) -> Result<Event> {
let h = res.headers().to_owned();
let b = res.bytes().await.map_err(|e| Error::Other {
source: Box::new(e),
})?;
MessageDeserializer::into_event(ResponseDeserializer::new(h, b))
}
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
#[async_trait(?Send)]
pub trait ResponseExt: private::Sealed {
/// Convert this [`Response`] to [`Event`].
async fn into_event(self) -> Result<Event>;
}
#[async_trait(?Send)]
impl ResponseExt for Response {
async fn into_event(self) -> Result<Event> {
response_to_event(self).await
}
}
// Sealing the ResponseExt
mod private {
pub trait Sealed {}
impl Sealed for reqwest::Response {}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::mock;
use chrono::Utc;
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[tokio::test]
async fn test_response() {
let time = Utc::now();
let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header("ce-specversion", "1.0")
.with_header("ce-id", "0001")
.with_header("ce-type", "example.test")
.with_header("ce-source", "http://localhost")
.with_header("ce-someint", "10")
.with_header("ce-time", &time.to_rfc3339())
.create();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.source("http://localhost")
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_event()
.await
.unwrap();
assert_eq!(expected, res);
}
#[tokio::test]
async fn test_response_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header("ce-specversion", "1.0")
.with_header("ce-id", "0001")
.with_header("ce-type", "example.test")
.with_header("ce-source", "http://localhost/")
.with_header("content-type", "application/json")
.with_header("ce-someint", "10")
.with_header("ce-time", &time.to_rfc3339())
.with_body(j.to_string())
.create();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.source("http://localhost/")
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_event()
.await
.unwrap();
assert_eq!(expected, res);
}
#[tokio::test]
async fn test_structured_response_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
//TODO this is required now because the message deserializer implictly set default values
// As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
.time(time)
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header(
"content-type",
"application/cloudevents+json; charset=utf-8",
)
.with_body(serde_json::to_string(&expected).unwrap())
.create();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_event()
.await
.unwrap();
assert_eq!(expected, res);
}
}

View File

@ -1,61 +0,0 @@
use cloudevents::event::SpecVersion;
use lazy_static::lazy_static;
use reqwest::header::{HeaderName, HeaderValue};
use std::collections::HashMap;
use std::str::FromStr;
macro_rules! unwrap_optional_header {
($headers:expr, $name:expr) => {
$headers
.get::<&'static reqwest::header::HeaderName>(&$name)
.map(|a| header_value_to_str!(a))
};
}
macro_rules! header_value_to_str {
($header_value:expr) => {
$header_value
.to_str()
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_name_to_header {
($attribute:expr) => {
reqwest::header::HeaderName::from_str($attribute).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})
};
}
macro_rules! attribute_name_to_header {
($attribute:expr) => {
str_name_to_header!(&["ce-", $attribute].concat())
};
}
fn attributes_to_headers(
it: impl Iterator<Item = &'static str>,
) -> HashMap<&'static str, HeaderName> {
it.map(|s| {
if s == "datacontenttype" {
(s, reqwest::header::CONTENT_TYPE)
} else {
(s, attribute_name_to_header!(s).unwrap())
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
attributes_to_headers(SpecVersion::all_attribute_names());
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
HeaderName::from_static("ce-specversion");
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
HeaderValue::from_static("application/cloudevents+json");
}

View File

@ -1,45 +0,0 @@
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [reqwest](https://docs.rs/reqwest/) to easily send and receive CloudEvents.
//!
//! ```
//! use cloudevents_sdk_reqwest::{RequestBuilderExt, ResponseExt};
//! use cloudevents::{EventBuilderV10, EventBuilder};
//! use serde_json::json;
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! let client = reqwest::Client::new();
//!
//! // Prepare the event to send
//! let event_to_send = EventBuilderV10::new()
//! .id("0001")
//! .ty("example.test")
//! .source("http://localhost/")
//! .data("application/json", json!({"hello": "world"}))
//! .build()?;
//!
//! // Send request
//! let response = client.post("http://localhost")
//! .event(event_to_send)?
//! .send().await?;
//! // Parse response as event
//! let received_event = response
//! .into_event().await?;
//! # Ok(())
//! # }
//! ```
//!
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`].
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk-reqwest/0.3.1")]
#![deny(broken_intra_doc_links)]
#[macro_use]
mod headers;
mod client_request;
mod client_response;
pub use client_request::event_to_request;
pub use client_request::RequestBuilderExt;
pub use client_request::RequestSerializer;
pub use client_response::response_to_event;
pub use client_response::ResponseDeserializer;
pub use client_response::ResponseExt;

View File

@ -1,9 +0,0 @@
#[test]
fn test_readme_deps() {
version_sync::assert_markdown_deps_updated!("README.md");
}
#[test]
fn test_html_root_url() {
version_sync::assert_html_root_url_updated!("src/lib.rs");
}

View File

@ -1,23 +0,0 @@
[package]
name = "cloudevents-sdk-warp"
version = "0.3.0"
authors = ["Marko Milenković <milenkovicm@users.noreply.github.com>"]
edition = "2018"
categories = ["web-programming", "encoding"]
license-file = "../LICENSE"
[dependencies]
cloudevents-sdk = { path = ".." }
lazy_static = "1.4.0"
bytes = "^1.0"
warp = "^0.3"
http = "0.2"
hyper = "^0.14"
[dev-dependencies]
tokio = { version = "^1.0", features = ["full"] }
url = { version = "^2.1" }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
mime = "0.3"
version-sync = "^0.9"

View File

@ -1,110 +0,0 @@
# CloudEvents SDK Rust - WARP [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [Warp - Web Server Framework](https://github.com/seanmonstar/warp/).
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
Using this crate you can extract CloudEvent from requests and write CloudEvents to responses.
To echo events:
```rust
use cloudevents_sdk_warp::{filter, reply};
use warp::Filter;
#[tokio::main]
async fn main() {
let routes = warp::any()
// extracting event from request
.and(filter::to_event())
// returning event back
.map(|event| reply::from_event(event));
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
```
Executing `http` request:
```bash
curl -v \
-H "ce-specversion: 1.0" \
-H "ce-id: 2" \
-H "ce-type: example.event" \
-H "ce-source: url://example_response/" \
-H "content-type: application/json" \
-X POST -d '{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }' \
http://localhost:3030/
```
Should produce response similar to:
```
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 3030 (#0)
> POST / HTTP/1.1
> Host: localhost:3030
> User-Agent: curl/7.64.1
> Accept: */*
> ce-specversion: 1.0
> ce-id: 2
> ce-type: example.event
> ce-source: url://example_response/
> content-type: application/json
> Content-Length: 74
>
* upload completely sent off: 74 out of 74 bytes
< HTTP/1.1 200 OK
< ce-specversion: 1.0
< ce-id: 2
< ce-type: example.event
< ce-source: url://example_response/
< content-type: application/json
< content-length: 74
< date: Mon, 02 Nov 2020 13:33:40 GMT
<
* Connection #0 to host localhost left intact
{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }
```
To create event inside request handlers and send them as responses:
```rust
#[tokio::main]
async fn main() {
let routes = warp::any().map(|| {
let event = EventBuilderV10::new()
.id("1")
.source(url::Url::parse("url://example_response/").unwrap())
.ty("example.ce")
.data(
mime::APPLICATION_JSON.to_string(),
json!({
"name": "John Doe",
"age": 43,
"phones": [
"+44 1234567",
"+44 2345678"
]
}),
)
.build();
match event {
Ok(event) => Ok(reply::from_event(event)),
Err(e) => Ok(warp::reply::with_status(
e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response()),
}
});
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
```
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-warp.svg
[crates.io]: https://crates.io/crates/cloudevents-sdk-warp
[Docs badge]: https://docs.rs/cloudevents-sdk-warp/badge.svg
[docs.rs]: https://docs.rs/cloudevents-sdk-warp

View File

@ -1,134 +0,0 @@
use crate::server_request::request_to_event;
use cloudevents::Event;
use warp::http::HeaderMap;
use warp::Filter;
use warp::Rejection;
#[derive(Debug)]
pub struct EventFilterError {
error: cloudevents::message::Error,
}
impl warp::reject::Reject for EventFilterError {}
///
/// # Extracts [`cloudevents::Event`] from incoming request
///
/// ```
/// use cloudevents_sdk_warp::filter::to_event;
/// use warp::Filter;
/// use warp::Reply;
///
/// let routes = warp::any()
/// .and(to_event())
/// .map(|event| {
/// // do something with the event
/// }
/// );
/// ```
///
pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
warp::header::headers_cloned()
.and(warp::body::bytes())
.and_then(create_event)
}
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
request_to_event(headers, body)
.map_err(|error| warp::reject::custom(EventFilterError { error }))
}
#[cfg(test)]
mod tests {
use super::to_event;
use warp::test;
use chrono::Utc;
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[tokio::test]
async fn test_request() {
let time = Utc::now();
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.time(time)
.extension("someint", "10")
.build()
.unwrap();
let result = test::request()
.method("POST")
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.filter(&to_event())
.await
.unwrap();
assert_eq!(expected, result);
}
#[tokio::test]
async fn test_bad_request() {
let time = Utc::now();
let result = test::request()
.method("POST")
.header("ce-specversion", "BAD SPECIFICATION")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.filter(&to_event())
.await;
assert!(result.is_err());
let rejection = result.unwrap_err();
let reason = rejection.find::<super::EventFilterError>().unwrap();
assert_eq!(
reason.error.to_string(),
"Invalid specversion BAD SPECIFICATION"
)
}
#[tokio::test]
async fn test_request_with_full_data() {
let time = Utc::now();
let j = json!({"hello": "world"});
let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.time(time)
.data("application/json", j.to_string().into_bytes())
.extension("someint", "10")
.build()
.unwrap();
let result = test::request()
.method("POST")
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost")
.header("ce-someint", "10")
.header("ce-time", time.to_rfc3339())
.header("content-type", "application/json")
.json(&j)
.filter(&to_event())
.await
.unwrap();
assert_eq!(expected, result);
}
}

View File

@ -1,61 +0,0 @@
use cloudevents::event::SpecVersion;
use http::header::HeaderName;
use lazy_static::lazy_static;
use warp::http::HeaderValue;
use std::collections::HashMap;
use std::str::FromStr;
macro_rules! unwrap_optional_header {
($headers:expr, $name:expr) => {
$headers
.get::<&'static HeaderName>(&$name)
.map(|a| header_value_to_str!(a))
};
}
macro_rules! header_value_to_str {
($header_value:expr) => {
$header_value
.to_str()
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! str_name_to_header {
($attribute:expr) => {
HeaderName::from_str($attribute).map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
};
}
macro_rules! attribute_name_to_header {
($attribute:expr) => {
str_name_to_header!(&["ce-", $attribute].concat())
};
}
fn attributes_to_headers(
it: impl Iterator<Item = &'static str>,
) -> HashMap<&'static str, HeaderName> {
it.map(|s| {
if s == "datacontenttype" {
(s, http::header::CONTENT_TYPE)
} else {
(s, attribute_name_to_header!(s).unwrap())
}
})
.collect()
}
lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
attributes_to_headers(SpecVersion::all_attribute_names());
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
HeaderName::from_static("ce-specversion");
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
HeaderValue::from_static("application/cloudevents+json");
}

View File

@ -1,70 +0,0 @@
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/)
//! to easily send and receive CloudEvents.
//!
//! To deserialize an HTTP request as CloudEvent
//!
//! To echo events:
//!
//! ```
//! use warp::{Filter, Reply};
//! use cloudevents_sdk_warp::reply::from_event;
//! use cloudevents_sdk_warp::filter::to_event;
//!
//! let routes = warp::any()
//! // extracting event from request
//! .and(to_event())
//! // returning event back
//! .map(|event| from_event(event));
//!
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
//! ```
//!
//! To create event inside request handlers and send them as responses:
//!
//! ```
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
//! use http::StatusCode;
//! use serde_json::json;
//! use warp::{Filter, Reply};
//! use cloudevents_sdk_warp::reply::from_event;
//!
//! let routes = warp::any().map(|| {
//! let event = EventBuilderV10::new()
//! .id("1")
//! .source("url://example_response/")
//! .ty("example.ce")
//! .data(
//! mime::APPLICATION_JSON.to_string(),
//! json!({
//! "name": "John Doe",
//! "age": 43,
//! "phones": [
//! "+44 1234567",
//! "+44 2345678"
//! ]
//! }),
//! )
//! .build();
//!
//! match event {
//! Ok(event) => Ok(from_event(event)),
//! Err(e) => Ok(warp::reply::with_status(
//! e.to_string(),
//! StatusCode::INTERNAL_SERVER_ERROR,
//! )
//! .into_response()),
//! }
//! });
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
//! ```
//!
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]
#[macro_use]
mod headers;
mod server_request;
mod server_response;
pub mod filter;
pub mod reply;

View File

@ -1,126 +0,0 @@
use crate::server_response::event_to_response;
use cloudevents::Event;
use http::StatusCode;
use warp::reply::Response;
///
/// # Serializes [`cloudevents::Event`] as a http response
///
/// ```
/// use cloudevents_sdk_warp::reply::from_event;
/// use cloudevents::Event;
/// use warp::Filter;
/// use warp::Reply;
///
/// let routes = warp::any()
/// .map(|| from_event(Event::default()));
/// ```
pub fn from_event(event: Event) -> Response {
match event_to_response(event) {
Ok(response) => response,
Err(e) => warp::http::response::Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::body::Body::from(e.to_string()))
.unwrap(),
}
}
#[cfg(test)]
mod tests {
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
#[test]
fn test_response() {
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost/")
.extension("someint", "10")
.build()
.unwrap();
let resp = super::from_event(input);
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost/"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
}
#[tokio::test]
async fn test_response_with_full_data() {
let j = json!({"hello": "world"});
let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/json", j.clone())
.extension("someint", "10")
.build()
.unwrap();
let resp = super::from_event(input);
assert_eq!(
resp.headers()
.get("ce-specversion")
.unwrap()
.to_str()
.unwrap(),
"1.0"
);
assert_eq!(
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
"0001"
);
assert_eq!(
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
"example.test"
);
assert_eq!(
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
"http://localhost"
);
assert_eq!(
resp.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap(),
"application/json"
);
assert_eq!(
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
"10"
);
let (_, body) = resp.into_parts();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(j.to_string().as_bytes(), body);
}
}

View File

@ -1,107 +0,0 @@
use super::headers;
use bytes::Bytes;
use http::{header::HeaderName, HeaderMap};
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use std::convert::TryFrom;
pub struct RequestDeserializer {
headers: HeaderMap,
body: Bytes,
}
impl RequestDeserializer {
pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer {
RequestDeserializer { headers, body }
}
}
impl BinaryDeserializer for RequestDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
)?;
visitor = visitor.set_spec_version(spec_version.clone())?;
let attributes = spec_version.attribute_names();
for (hn, hv) in self
.headers
.iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
{
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.headers.get("content-type") {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if self.body.len() != 0 {
visitor.end_with_data(self.body.to_vec())
} else {
visitor.end()
}
}
}
impl StructuredDeserializer for RequestDeserializer {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body.to_vec())
}
}
impl MessageDeserializer for RequestDeserializer {
fn encoding(&self) -> Encoding {
if self
.headers
.get("content-type")
.map(|v| v.to_str().unwrap_or(""))
.unwrap_or("")
== "application/cloudevents+json"
{
Encoding::STRUCTURED
} else if self
.headers
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
.is_some()
{
Encoding::BINARY
} else {
Encoding::UNKNOWN
}
}
}
pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result<Event, Error> {
MessageDeserializer::into_event(RequestDeserializer::new(req, bytes))
}

View File

@ -1,102 +0,0 @@
use super::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
StructuredSerializer,
};
use cloudevents::Event;
use warp::http::HeaderValue;
use warp::hyper::Body;
use warp::reply::Response;
use http::header::HeaderName;
use http::response::Builder;
use std::{convert::TryFrom, str::FromStr};
pub struct ResponseSerializer {
builder: Builder,
}
impl ResponseSerializer {
fn new() -> Self {
ResponseSerializer {
builder: http::Response::builder(),
}
}
}
impl BinarySerializer<Response> for ResponseSerializer {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.builder = self.builder.header(
headers::SPEC_VERSION_HEADER.clone(),
HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?,
);
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder = self.builder.header(
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?,
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder = self.builder.header(
attribute_name_to_header!(name)?,
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?,
);
Ok(self)
}
fn end_with_data(self, bytes: Vec<u8>) -> Result<Response> {
self.builder
.body(Body::from(bytes))
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
}
fn end(self) -> Result<Response> {
self.builder
.body(Body::empty())
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})
}
}
impl StructuredSerializer<Response> for ResponseSerializer {
fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
Ok(self
.builder
.header(
http::header::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER.clone(),
)
.body(Body::from(bytes))
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})?)
}
}
pub fn event_to_response(event: Event) -> std::result::Result<Response, Error> {
BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new())
}

View File

@ -1,9 +0,0 @@
#[test]
fn test_readme_deps() {
version_sync::assert_markdown_deps_updated!("README.md");
}
#[test]
fn test_html_root_url() {
version_sync::assert_html_root_url_updated!("src/lib.rs");
}

View File

@ -14,5 +14,3 @@ futures = "^0.3"
serde_json = "^1.0"
url = { version = "^2.1" }
env_logger = "0.7.1"
[workspace]

View File

@ -17,5 +17,3 @@ futures = "^0.3"
tokio = { version = "^1.0", features = ["full"] }
clap = "2.33.1"
rdkafka = { version = "^0.25", features = ["cmake-build"] }
[workspace]

View File

@ -16,5 +16,3 @@ url = { version = "^2.1" }
web-sys = { version = "0.3.39", features = ["Window", "Location"] }
wasm-bindgen-futures = "0.4.12"
wasm-bindgen = { version = "0.2.62", features = ["serde-serialize"] }
[workspace]

View File

@ -10,6 +10,3 @@ license-file = "../LICENSE"
cloudevents-sdk = { path = "../..", features = ["cloudevents-warp"] }
warp = "^0.3"
tokio = { version = "^1.0", features = ["full"] }
[workspace]