Add support for `seanmonstar/warp` web server framework (#97)
Initial take on cloud events support for `seanmonstar/warp` Signed-off-by: Marko Milenković <milenkovicm@users.noreply.github.com>
This commit is contained in:
parent
d69ab904d1
commit
5e5aca54be
|
@ -124,3 +124,11 @@ jobs:
|
||||||
command: build
|
command: build
|
||||||
toolchain: ${{ matrix.toolchain }}
|
toolchain: ${{ matrix.toolchain }}
|
||||||
args: --target ${{ matrix.target }} --manifest-path ./example-projects/actix-web-example/Cargo.toml
|
args: --target ${{ matrix.target }} --manifest-path ./example-projects/actix-web-example/Cargo.toml
|
||||||
|
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
name: "Build warp-example"
|
||||||
|
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
|
||||||
|
with:
|
||||||
|
command: build
|
||||||
|
toolchain: ${{ matrix.toolchain }}
|
||||||
|
args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml
|
||||||
|
|
|
@ -44,10 +44,12 @@ members = [
|
||||||
".",
|
".",
|
||||||
"cloudevents-sdk-actix-web",
|
"cloudevents-sdk-actix-web",
|
||||||
"cloudevents-sdk-reqwest",
|
"cloudevents-sdk-reqwest",
|
||||||
"cloudevents-sdk-rdkafka"
|
"cloudevents-sdk-rdkafka",
|
||||||
|
"cloudevents-sdk-warp"
|
||||||
]
|
]
|
||||||
exclude = [
|
exclude = [
|
||||||
"example-projects/actix-web-example",
|
"example-projects/actix-web-example",
|
||||||
"example-projects/reqwest-wasm-example",
|
"example-projects/reqwest-wasm-example",
|
||||||
"example-projects/rdkafka-example",
|
"example-projects/rdkafka-example",
|
||||||
|
"example-projects/warp-example",
|
||||||
]
|
]
|
|
@ -24,6 +24,7 @@ Note: This project is WIP under active development, hence all APIs are considere
|
||||||
* `cloudevents-sdk-actix-web`: Integration with [Actix Web](https://github.com/actix/actix-web).
|
* `cloudevents-sdk-actix-web`: Integration with [Actix Web](https://github.com/actix/actix-web).
|
||||||
* `cloudevents-sdk-reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest).
|
* `cloudevents-sdk-reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest).
|
||||||
* `cloudevents-sdk-rdkafka`: Integration with [rust-rdkafka](https://fede1024.github.io/rust-rdkafka).
|
* `cloudevents-sdk-rdkafka`: Integration with [rust-rdkafka](https://fede1024.github.io/rust-rdkafka).
|
||||||
|
* `cloudevents-sdk-warp`: Integration with [rust-rdkafka](https://github.com/seanmonstar/warp/).
|
||||||
|
|
||||||
## Get Started
|
## Get Started
|
||||||
|
|
||||||
|
@ -52,6 +53,7 @@ Checkout the examples using our integrations to learn how to send and receive ev
|
||||||
* [Actix Web Example](example-projects/actix-web-example)
|
* [Actix Web Example](example-projects/actix-web-example)
|
||||||
* [Reqwest/WASM Example](example-projects/reqwest-wasm-example)
|
* [Reqwest/WASM Example](example-projects/reqwest-wasm-example)
|
||||||
* [Kafka Example](example-projects/rdkafka-example)
|
* [Kafka Example](example-projects/rdkafka-example)
|
||||||
|
* [Warp Example](example-projects/warp-example)
|
||||||
|
|
||||||
## Development & Contributing
|
## Development & Contributing
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
[package]
|
||||||
|
name = "cloudevents-sdk-warp"
|
||||||
|
version = "0.3.0"
|
||||||
|
authors = ["Marko Milenković <milenkovicm@users.noreply.github.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
categories = ["web-programming", "encoding"]
|
||||||
|
license-file = "../LICENSE"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
cloudevents-sdk = { path = ".." }
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
bytes = "^0.5"
|
||||||
|
warp = "0.2"
|
||||||
|
http = "0.2"
|
||||||
|
hyper = "0.13"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { version = "^0.2", features = ["full"] }
|
||||||
|
url = { version = "^2.1" }
|
||||||
|
serde_json = "^1.0"
|
||||||
|
chrono = { version = "^0.4", features = ["serde"] }
|
||||||
|
mime = "0.3"
|
||||||
|
version-sync = "^0.9"
|
|
@ -0,0 +1,110 @@
|
||||||
|
# CloudEvents SDK Rust - WARP [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
|
||||||
|
|
||||||
|
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [Warp - Web Server Framework](https://github.com/seanmonstar/warp/).
|
||||||
|
|
||||||
|
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
|
||||||
|
|
||||||
|
Using this crate you can extract CloudEvent from requests and write CloudEvents to responses.
|
||||||
|
|
||||||
|
To echo events:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use cloudevents_sdk_warp::{filter, reply};
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let routes = warp::any()
|
||||||
|
// extracting event from request
|
||||||
|
.and(filter::to_event())
|
||||||
|
// returning event back
|
||||||
|
.map(|event| reply::from_event(event));
|
||||||
|
|
||||||
|
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Executing `http` request:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -v \
|
||||||
|
-H "ce-specversion: 1.0" \
|
||||||
|
-H "ce-id: 2" \
|
||||||
|
-H "ce-type: example.event" \
|
||||||
|
-H "ce-source: url://example_response/" \
|
||||||
|
-H "content-type: application/json" \
|
||||||
|
-X POST -d '{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }' \
|
||||||
|
http://localhost:3030/
|
||||||
|
```
|
||||||
|
|
||||||
|
Should produce response similar to:
|
||||||
|
|
||||||
|
```
|
||||||
|
* TCP_NODELAY set
|
||||||
|
* Connected to localhost (127.0.0.1) port 3030 (#0)
|
||||||
|
> POST / HTTP/1.1
|
||||||
|
> Host: localhost:3030
|
||||||
|
> User-Agent: curl/7.64.1
|
||||||
|
> Accept: */*
|
||||||
|
> ce-specversion: 1.0
|
||||||
|
> ce-id: 2
|
||||||
|
> ce-type: example.event
|
||||||
|
> ce-source: url://example_response/
|
||||||
|
> content-type: application/json
|
||||||
|
> Content-Length: 74
|
||||||
|
>
|
||||||
|
* upload completely sent off: 74 out of 74 bytes
|
||||||
|
< HTTP/1.1 200 OK
|
||||||
|
< ce-specversion: 1.0
|
||||||
|
< ce-id: 2
|
||||||
|
< ce-type: example.event
|
||||||
|
< ce-source: url://example_response/
|
||||||
|
< content-type: application/json
|
||||||
|
< content-length: 74
|
||||||
|
< date: Mon, 02 Nov 2020 13:33:40 GMT
|
||||||
|
<
|
||||||
|
* Connection #0 to host localhost left intact
|
||||||
|
{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }
|
||||||
|
```
|
||||||
|
|
||||||
|
To create event inside request handlers and send them as responses:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let routes = warp::any().map(|| {
|
||||||
|
let event = EventBuilderV10::new()
|
||||||
|
.id("1")
|
||||||
|
.source(url::Url::parse("url://example_response/").unwrap())
|
||||||
|
.ty("example.ce")
|
||||||
|
.data(
|
||||||
|
mime::APPLICATION_JSON.to_string(),
|
||||||
|
json!({
|
||||||
|
"name": "John Doe",
|
||||||
|
"age": 43,
|
||||||
|
"phones": [
|
||||||
|
"+44 1234567",
|
||||||
|
"+44 2345678"
|
||||||
|
]
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
match event {
|
||||||
|
Ok(event) => Ok(reply::from_event(event)),
|
||||||
|
Err(e) => Ok(warp::reply::with_status(
|
||||||
|
e.to_string(),
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
)
|
||||||
|
.into_response()),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-warp.svg
|
||||||
|
[crates.io]: https://crates.io/crates/cloudevents-sdk-warp
|
||||||
|
[Docs badge]: https://docs.rs/cloudevents-sdk-warp/badge.svg
|
||||||
|
[docs.rs]: https://docs.rs/cloudevents-sdk-warp
|
|
@ -0,0 +1,136 @@
|
||||||
|
use crate::server_request::request_to_event;
|
||||||
|
|
||||||
|
use cloudevents::Event;
|
||||||
|
use warp::http::HeaderMap;
|
||||||
|
use warp::Filter;
|
||||||
|
use warp::Rejection;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct EventFilterError {
|
||||||
|
error: cloudevents::message::Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl warp::reject::Reject for EventFilterError {}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// # Extracts [`cloudevents::Event`] from incoming request
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use cloudevents_sdk_warp::filter::to_event;
|
||||||
|
/// use warp::Filter;
|
||||||
|
/// use warp::Reply;
|
||||||
|
///
|
||||||
|
/// let routes = warp::any()
|
||||||
|
/// .and(to_event())
|
||||||
|
/// .map(|event| {
|
||||||
|
/// // do something with the event
|
||||||
|
/// }
|
||||||
|
/// );
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
|
||||||
|
warp::header::headers_cloned()
|
||||||
|
.and(warp::body::bytes())
|
||||||
|
.and_then(create_event)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
|
||||||
|
request_to_event(headers, body)
|
||||||
|
.map_err(|error| warp::reject::custom(EventFilterError { error }))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::to_event;
|
||||||
|
use url::Url;
|
||||||
|
use warp::test;
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
use cloudevents::{EventBuilder, EventBuilderV10};
|
||||||
|
use serde_json::json;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_request() {
|
||||||
|
let time = Utc::now();
|
||||||
|
let expected = EventBuilderV10::new()
|
||||||
|
.id("0001")
|
||||||
|
.ty("example.test")
|
||||||
|
.source("http://localhost/")
|
||||||
|
.time(time)
|
||||||
|
.extension("someint", "10")
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let result = test::request()
|
||||||
|
.method("POST")
|
||||||
|
.header("ce-specversion", "1.0")
|
||||||
|
.header("ce-id", "0001")
|
||||||
|
.header("ce-type", "example.test")
|
||||||
|
.header("ce-source", "http://localhost/")
|
||||||
|
.header("ce-someint", "10")
|
||||||
|
.header("ce-time", time.to_rfc3339())
|
||||||
|
.filter(&to_event())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(expected, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_bad_request() {
|
||||||
|
let time = Utc::now();
|
||||||
|
|
||||||
|
let result = test::request()
|
||||||
|
.method("POST")
|
||||||
|
.header("ce-specversion", "BAD SPECIFICATION")
|
||||||
|
.header("ce-id", "0001")
|
||||||
|
.header("ce-type", "example.test")
|
||||||
|
.header("ce-source", "http://localhost/")
|
||||||
|
.header("ce-someint", "10")
|
||||||
|
.header("ce-time", time.to_rfc3339())
|
||||||
|
.filter(&to_event())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_err());
|
||||||
|
let rejection = result.unwrap_err();
|
||||||
|
|
||||||
|
let reason = rejection.find::<super::EventFilterError>().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
reason.error.to_string(),
|
||||||
|
"Invalid specversion BAD SPECIFICATION"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_request_with_full_data() {
|
||||||
|
let time = Utc::now();
|
||||||
|
let j = json!({"hello": "world"});
|
||||||
|
|
||||||
|
let expected = EventBuilderV10::new()
|
||||||
|
.id("0001")
|
||||||
|
.ty("example.test")
|
||||||
|
.source(Url::from_str("http://localhost").unwrap())
|
||||||
|
.time(time)
|
||||||
|
.data("application/json", j.to_string().into_bytes())
|
||||||
|
.extension("someint", "10")
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let result = test::request()
|
||||||
|
.method("POST")
|
||||||
|
.header("ce-specversion", "1.0")
|
||||||
|
.header("ce-id", "0001")
|
||||||
|
.header("ce-type", "example.test")
|
||||||
|
.header("ce-source", "http://localhost")
|
||||||
|
.header("ce-someint", "10")
|
||||||
|
.header("ce-time", time.to_rfc3339())
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.json(&j)
|
||||||
|
.filter(&to_event())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(expected, result);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
use cloudevents::event::SpecVersion;
|
||||||
|
use http::header::HeaderName;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use warp::http::HeaderValue;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
macro_rules! unwrap_optional_header {
|
||||||
|
($headers:expr, $name:expr) => {
|
||||||
|
$headers
|
||||||
|
.get::<&'static HeaderName>(&$name)
|
||||||
|
.map(|a| header_value_to_str!(a))
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! header_value_to_str {
|
||||||
|
($header_value:expr) => {
|
||||||
|
$header_value
|
||||||
|
.to_str()
|
||||||
|
.map_err(|e| cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! str_name_to_header {
|
||||||
|
($attribute:expr) => {
|
||||||
|
HeaderName::from_str($attribute).map_err(|e| cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! attribute_name_to_header {
|
||||||
|
($attribute:expr) => {
|
||||||
|
str_name_to_header!(&["ce-", $attribute].concat())
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn attributes_to_headers(
|
||||||
|
it: impl Iterator<Item = &'static str>,
|
||||||
|
) -> HashMap<&'static str, HeaderName> {
|
||||||
|
it.map(|s| {
|
||||||
|
if s == "datacontenttype" {
|
||||||
|
(s, http::header::CONTENT_TYPE)
|
||||||
|
} else {
|
||||||
|
(s, attribute_name_to_header!(s).unwrap())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
|
||||||
|
attributes_to_headers(SpecVersion::all_attribute_names());
|
||||||
|
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
|
||||||
|
HeaderName::from_static("ce-specversion");
|
||||||
|
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
|
||||||
|
HeaderValue::from_static("application/cloudevents+json");
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/)
|
||||||
|
//! to easily send and receive CloudEvents.
|
||||||
|
//!
|
||||||
|
//! To deserialize an HTTP request as CloudEvent
|
||||||
|
//!
|
||||||
|
//! To echo events:
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
|
//! use warp::{Filter, Reply};
|
||||||
|
//! use cloudevents_sdk_warp::reply::from_event;
|
||||||
|
//! use cloudevents_sdk_warp::filter::to_event;
|
||||||
|
//!
|
||||||
|
//! let routes = warp::any()
|
||||||
|
//! // extracting event from request
|
||||||
|
//! .and(to_event())
|
||||||
|
//! // returning event back
|
||||||
|
//! .map(|event| from_event(event));
|
||||||
|
//!
|
||||||
|
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! To create event inside request handlers and send them as responses:
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
|
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
|
||||||
|
//! use http::StatusCode;
|
||||||
|
//! use serde_json::json;
|
||||||
|
//! use warp::{Filter, Reply};
|
||||||
|
//! use cloudevents_sdk_warp::reply::from_event;
|
||||||
|
//!
|
||||||
|
//! let routes = warp::any().map(|| {
|
||||||
|
//! let event = EventBuilderV10::new()
|
||||||
|
//! .id("1")
|
||||||
|
//! .source(url::Url::parse("url://example_response/").unwrap())
|
||||||
|
//! .ty("example.ce")
|
||||||
|
//! .data(
|
||||||
|
//! mime::APPLICATION_JSON.to_string(),
|
||||||
|
//! json!({
|
||||||
|
//! "name": "John Doe",
|
||||||
|
//! "age": 43,
|
||||||
|
//! "phones": [
|
||||||
|
//! "+44 1234567",
|
||||||
|
//! "+44 2345678"
|
||||||
|
//! ]
|
||||||
|
//! }),
|
||||||
|
//! )
|
||||||
|
//! .build();
|
||||||
|
//!
|
||||||
|
//! match event {
|
||||||
|
//! Ok(event) => Ok(from_event(event)),
|
||||||
|
//! Err(e) => Ok(warp::reply::with_status(
|
||||||
|
//! e.to_string(),
|
||||||
|
//! StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
//! )
|
||||||
|
//! .into_response()),
|
||||||
|
//! }
|
||||||
|
//! });
|
||||||
|
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
mod headers;
|
||||||
|
|
||||||
|
mod server_request;
|
||||||
|
mod server_response;
|
||||||
|
|
||||||
|
pub mod filter;
|
||||||
|
pub mod reply;
|
|
@ -0,0 +1,128 @@
|
||||||
|
use crate::server_response::event_to_response;
|
||||||
|
|
||||||
|
use cloudevents::Event;
|
||||||
|
use http::StatusCode;
|
||||||
|
use warp::reply::Response;
|
||||||
|
|
||||||
|
///
|
||||||
|
/// # Serializes [`cloudevents::Event`] as a http response
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use cloudevents_sdk_warp::reply::from_event;
|
||||||
|
/// use cloudevents::Event;
|
||||||
|
/// use warp::Filter;
|
||||||
|
/// use warp::Reply;
|
||||||
|
///
|
||||||
|
/// let routes = warp::any()
|
||||||
|
/// .map(|| from_event(Event::default()));
|
||||||
|
/// ```
|
||||||
|
pub fn from_event(event: Event) -> Response {
|
||||||
|
match event_to_response(event) {
|
||||||
|
Ok(response) => response,
|
||||||
|
Err(e) => warp::http::response::Response::builder()
|
||||||
|
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(hyper::body::Body::from(e.to_string()))
|
||||||
|
.unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
use cloudevents::{EventBuilder, EventBuilderV10};
|
||||||
|
use serde_json::json;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_response() {
|
||||||
|
let input = EventBuilderV10::new()
|
||||||
|
.id("0001")
|
||||||
|
.ty("example.test")
|
||||||
|
.source(Url::from_str("http://localhost/").unwrap())
|
||||||
|
.extension("someint", "10")
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let resp = super::from_event(input);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers()
|
||||||
|
.get("ce-specversion")
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap(),
|
||||||
|
"1.0"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
|
||||||
|
"0001"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
|
||||||
|
"example.test"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
|
||||||
|
"http://localhost/"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
|
||||||
|
"10"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_response_with_full_data() {
|
||||||
|
let j = json!({"hello": "world"});
|
||||||
|
|
||||||
|
let input = EventBuilderV10::new()
|
||||||
|
.id("0001")
|
||||||
|
.ty("example.test")
|
||||||
|
.source(Url::from_str("http://localhost").unwrap())
|
||||||
|
.data("application/json", j.clone())
|
||||||
|
.extension("someint", "10")
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let resp = super::from_event(input);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers()
|
||||||
|
.get("ce-specversion")
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap(),
|
||||||
|
"1.0"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
|
||||||
|
"0001"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
|
||||||
|
"example.test"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
|
||||||
|
"http://localhost/"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers()
|
||||||
|
.get("content-type")
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap(),
|
||||||
|
"application/json"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
|
||||||
|
"10"
|
||||||
|
);
|
||||||
|
|
||||||
|
let (_, body) = resp.into_parts();
|
||||||
|
let body = hyper::body::to_bytes(body).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(j.to_string().as_bytes(), body);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
use super::headers;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use http::{header::HeaderName, HeaderMap};
|
||||||
|
|
||||||
|
use cloudevents::event::SpecVersion;
|
||||||
|
use cloudevents::message::{
|
||||||
|
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
||||||
|
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
||||||
|
};
|
||||||
|
|
||||||
|
use cloudevents::{message, Event};
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
pub struct RequestDeserializer {
|
||||||
|
headers: HeaderMap,
|
||||||
|
body: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestDeserializer {
|
||||||
|
pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer {
|
||||||
|
RequestDeserializer { headers, body }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BinaryDeserializer for RequestDeserializer {
|
||||||
|
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
|
||||||
|
if self.encoding() != Encoding::BINARY {
|
||||||
|
return Err(message::Error::WrongEncoding {});
|
||||||
|
}
|
||||||
|
|
||||||
|
let spec_version = SpecVersion::try_from(
|
||||||
|
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||||
|
|
||||||
|
let attributes = spec_version.attribute_names();
|
||||||
|
|
||||||
|
for (hn, hv) in self
|
||||||
|
.headers
|
||||||
|
.iter()
|
||||||
|
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
|
||||||
|
{
|
||||||
|
let name = &hn.as_str()["ce-".len()..];
|
||||||
|
|
||||||
|
if attributes.contains(&name) {
|
||||||
|
visitor = visitor.set_attribute(
|
||||||
|
name,
|
||||||
|
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||||
|
)?
|
||||||
|
} else {
|
||||||
|
visitor = visitor.set_extension(
|
||||||
|
name,
|
||||||
|
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||||
|
)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(hv) = self.headers.get("content-type") {
|
||||||
|
visitor = visitor.set_attribute(
|
||||||
|
"datacontenttype",
|
||||||
|
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||||
|
)?
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.body.len() != 0 {
|
||||||
|
visitor.end_with_data(self.body.to_vec())
|
||||||
|
} else {
|
||||||
|
visitor.end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StructuredDeserializer for RequestDeserializer {
|
||||||
|
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
|
||||||
|
if self.encoding() != Encoding::STRUCTURED {
|
||||||
|
return Err(message::Error::WrongEncoding {});
|
||||||
|
}
|
||||||
|
visitor.set_structured_event(self.body.to_vec())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageDeserializer for RequestDeserializer {
|
||||||
|
fn encoding(&self) -> Encoding {
|
||||||
|
if self
|
||||||
|
.headers
|
||||||
|
.get("content-type")
|
||||||
|
.map(|v| v.to_str().unwrap_or(""))
|
||||||
|
.unwrap_or("")
|
||||||
|
== "application/cloudevents+json"
|
||||||
|
{
|
||||||
|
Encoding::STRUCTURED
|
||||||
|
} else if self
|
||||||
|
.headers
|
||||||
|
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
|
||||||
|
.is_some()
|
||||||
|
{
|
||||||
|
Encoding::BINARY
|
||||||
|
} else {
|
||||||
|
Encoding::UNKNOWN
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result<Event, Error> {
|
||||||
|
MessageDeserializer::into_event(RequestDeserializer::new(req, bytes))
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
use super::headers;
|
||||||
|
|
||||||
|
use cloudevents::event::SpecVersion;
|
||||||
|
use cloudevents::message::{
|
||||||
|
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
|
||||||
|
StructuredSerializer,
|
||||||
|
};
|
||||||
|
use cloudevents::Event;
|
||||||
|
|
||||||
|
use warp::http::HeaderValue;
|
||||||
|
use warp::hyper::Body;
|
||||||
|
use warp::reply::Response;
|
||||||
|
|
||||||
|
use http::header::HeaderName;
|
||||||
|
use http::response::Builder;
|
||||||
|
|
||||||
|
use std::{convert::TryFrom, str::FromStr};
|
||||||
|
|
||||||
|
pub struct ResponseSerializer {
|
||||||
|
builder: Builder,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ResponseSerializer {
|
||||||
|
fn new() -> Self {
|
||||||
|
ResponseSerializer {
|
||||||
|
builder: http::Response::builder(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BinarySerializer<Response> for ResponseSerializer {
|
||||||
|
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
||||||
|
self.builder = self.builder.header(
|
||||||
|
headers::SPEC_VERSION_HEADER.clone(),
|
||||||
|
HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| {
|
||||||
|
cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
}
|
||||||
|
})?,
|
||||||
|
);
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
|
self.builder = self.builder.header(
|
||||||
|
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
|
||||||
|
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
|
||||||
|
cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
}
|
||||||
|
})?,
|
||||||
|
);
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
|
self.builder = self.builder.header(
|
||||||
|
attribute_name_to_header!(name)?,
|
||||||
|
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
|
||||||
|
cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
}
|
||||||
|
})?,
|
||||||
|
);
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end_with_data(self, bytes: Vec<u8>) -> Result<Response> {
|
||||||
|
self.builder
|
||||||
|
.body(Body::from(bytes))
|
||||||
|
.map_err(|e| cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end(self) -> Result<Response> {
|
||||||
|
self.builder
|
||||||
|
.body(Body::empty())
|
||||||
|
.map_err(|e| cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StructuredSerializer<Response> for ResponseSerializer {
|
||||||
|
fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
|
||||||
|
Ok(self
|
||||||
|
.builder
|
||||||
|
.header(
|
||||||
|
http::header::CONTENT_TYPE,
|
||||||
|
headers::CLOUDEVENTS_JSON_HEADER.clone(),
|
||||||
|
)
|
||||||
|
.body(Body::from(bytes))
|
||||||
|
.map_err(|e| cloudevents::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
})?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn event_to_response(event: Event) -> std::result::Result<Response, Error> {
|
||||||
|
BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new())
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
#[test]
|
||||||
|
fn test_readme_deps() {
|
||||||
|
version_sync::assert_markdown_deps_updated!("README.md");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_html_root_url() {
|
||||||
|
version_sync::assert_html_root_url_updated!("src/lib.rs");
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
[package]
|
||||||
|
name = "warp-example"
|
||||||
|
version = "0.3.0"
|
||||||
|
authors = ["Marko Milenković <milenkovicm@users.noreply.github.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
categories = ["web-programming", "encoding"]
|
||||||
|
license-file = "../LICENSE"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
cloudevents-sdk = { path = "../.." }
|
||||||
|
cloudevents-sdk-warp = { path = "../../cloudevents-sdk-warp"}
|
||||||
|
warp = "0.2"
|
||||||
|
tokio = { version = "^0.2", features = ["full"] }
|
||||||
|
|
||||||
|
[workspace]
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
use cloudevents_sdk_warp::{filter, reply};
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let routes = warp::any()
|
||||||
|
// extracting event from request
|
||||||
|
.and(filter::to_event())
|
||||||
|
// returning event back
|
||||||
|
.map(|event| reply::from_event(event));
|
||||||
|
|
||||||
|
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
|
||||||
|
}
|
Loading…
Reference in New Issue