Encapsulate shared event deserialization behind a Headers trait
Both warp and reqwest use the HeaderMap from the http crate. Actix has its own. Fortunately, both contain (HeaderName, HeaderValue) tuples. Further, actix uses a conflicting version of the bytes crate, so I store a Vec<u8> instead of a Bytes member in the Deserializer struct. Not sure if that's a problem, but the tests pass. :) We use an associated type in the Headers trait to facilitate static dispatch for warp/reqwest since their concrete iterator is public, but the actix Iter struct is private, so we use a Box for its impl. We're using AsHeaderName for the get() param to avoid having to call as_str() on any header constants, but of course actix uses its own AsName trait, which isn't public, so we must call as_str() for the passed header name in its impl. Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
parent
ca3ba3b88c
commit
2e72947bc7
|
|
@ -18,7 +18,7 @@ name = "cloudevents"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
|
actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
|
||||||
reqwest = ["reqwest-lib", "async-trait", "bytes"]
|
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
|
||||||
rdkafka = ["rdkafka-lib", "bytes", "futures"]
|
rdkafka = ["rdkafka-lib", "bytes", "futures"]
|
||||||
warp = ["warp-lib", "bytes", "http", "hyper"]
|
warp = ["warp-lib", "bytes", "http", "hyper"]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,6 @@ mod server_request;
|
||||||
mod server_response;
|
mod server_response;
|
||||||
|
|
||||||
pub use server_request::request_to_event;
|
pub use server_request::request_to_event;
|
||||||
pub use server_request::HttpRequestDeserializer;
|
|
||||||
pub use server_request::HttpRequestExt;
|
pub use server_request::HttpRequestExt;
|
||||||
pub use server_response::event_to_response;
|
pub use server_response::event_to_response;
|
||||||
pub use server_response::HttpResponseBuilderExt;
|
pub use server_response::HttpResponseBuilderExt;
|
||||||
|
|
|
||||||
|
|
@ -1,99 +1,20 @@
|
||||||
use crate::binding::http::SPEC_VERSION_HEADER;
|
use crate::binding::http::{to_event, Headers};
|
||||||
use crate::event::SpecVersion;
|
use crate::Event;
|
||||||
use crate::message::{
|
use actix_web::web::BytesMut;
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
use actix_web::{web, HttpRequest};
|
||||||
Result, StructuredDeserializer, StructuredSerializer,
|
|
||||||
};
|
|
||||||
use crate::{header_value_to_str, message, Event};
|
|
||||||
use actix_web::web::{Bytes, BytesMut};
|
|
||||||
use actix_web::{web, HttpMessage, HttpRequest};
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::future::LocalBoxFuture;
|
use futures::future::LocalBoxFuture;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
use std::convert::TryFrom;
|
use http::header::{AsHeaderName, HeaderName, HeaderValue};
|
||||||
|
|
||||||
/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait.
|
/// Implement Headers for the actix HeaderMap
|
||||||
pub struct HttpRequestDeserializer<'a> {
|
impl<'a> Headers<'a> for actix_web::http::HeaderMap {
|
||||||
req: &'a HttpRequest,
|
type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
|
||||||
body: Bytes,
|
fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
|
||||||
}
|
self.get(key.as_str())
|
||||||
|
|
||||||
impl HttpRequestDeserializer<'_> {
|
|
||||||
pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer {
|
|
||||||
HttpRequestDeserializer { req, body }
|
|
||||||
}
|
}
|
||||||
}
|
fn iter(&'a self) -> Self::Iterator {
|
||||||
|
Box::new(self.iter())
|
||||||
impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
|
|
||||||
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
|
|
||||||
if self.encoding() != Encoding::BINARY {
|
|
||||||
return Err(message::Error::WrongEncoding {});
|
|
||||||
}
|
|
||||||
|
|
||||||
let spec_version = SpecVersion::try_from(
|
|
||||||
self.req
|
|
||||||
.headers()
|
|
||||||
.get(SPEC_VERSION_HEADER)
|
|
||||||
.map(|a| header_value_to_str!(a))
|
|
||||||
.unwrap()?,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
|
||||||
|
|
||||||
let attributes = spec_version.attribute_names();
|
|
||||||
|
|
||||||
for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| {
|
|
||||||
let key = hn.as_str();
|
|
||||||
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
|
||||||
}) {
|
|
||||||
let name = &hn.as_str()["ce-".len()..];
|
|
||||||
|
|
||||||
if attributes.contains(&name) {
|
|
||||||
visitor = visitor.set_attribute(
|
|
||||||
name,
|
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
|
||||||
)?
|
|
||||||
} else {
|
|
||||||
visitor = visitor.set_extension(
|
|
||||||
name,
|
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
|
||||||
)?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(hv) = self.req.headers().get("content-type") {
|
|
||||||
visitor = visitor.set_attribute(
|
|
||||||
"datacontenttype",
|
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
|
||||||
)?
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.body.is_empty() {
|
|
||||||
visitor.end_with_data(self.body.to_vec())
|
|
||||||
} else {
|
|
||||||
visitor.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
|
|
||||||
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
|
|
||||||
if self.encoding() != Encoding::STRUCTURED {
|
|
||||||
return Err(message::Error::WrongEncoding {});
|
|
||||||
}
|
|
||||||
visitor.set_structured_event(self.body.to_vec())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
|
|
||||||
fn encoding(&self) -> Encoding {
|
|
||||||
if self.req.content_type() == "application/cloudevents+json" {
|
|
||||||
Encoding::STRUCTURED
|
|
||||||
} else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() {
|
|
||||||
Encoding::BINARY
|
|
||||||
} else {
|
|
||||||
Encoding::UNKNOWN
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -106,8 +27,7 @@ pub async fn request_to_event(
|
||||||
while let Some(item) = payload.next().await {
|
while let Some(item) = payload.next().await {
|
||||||
bytes.extend_from_slice(&item?);
|
bytes.extend_from_slice(&item?);
|
||||||
}
|
}
|
||||||
MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze()))
|
to_event(req.headers(), bytes.to_vec()).map_err(actix_web::error::ErrorBadRequest)
|
||||||
.map_err(actix_web::error::ErrorBadRequest)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// So that an actix-web handler may take an Event parameter
|
/// So that an actix-web handler may take an Event parameter
|
||||||
|
|
|
||||||
|
|
@ -1,28 +1,27 @@
|
||||||
use bytes::Bytes;
|
use super::{Headers, SPEC_VERSION_HEADER};
|
||||||
use http::HeaderMap;
|
use crate::{
|
||||||
|
binding::CLOUDEVENTS_JSON_HEADER,
|
||||||
use crate::binding::http::SPEC_VERSION_HEADER;
|
event::SpecVersion,
|
||||||
use crate::event::SpecVersion;
|
header_value_to_str, message,
|
||||||
use crate::header_value_to_str;
|
message::{
|
||||||
use crate::message::{
|
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
Result, StructuredDeserializer, StructuredSerializer,
|
||||||
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
},
|
||||||
};
|
};
|
||||||
use crate::{message, Event};
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
pub struct RequestDeserializer {
|
pub struct Deserializer<'a, T: Headers<'a>> {
|
||||||
headers: HeaderMap,
|
headers: &'a T,
|
||||||
body: Bytes,
|
body: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestDeserializer {
|
impl<'a, T: Headers<'a>> Deserializer<'a, T> {
|
||||||
pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer {
|
pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
|
||||||
RequestDeserializer { headers, body }
|
Deserializer { headers, body }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BinaryDeserializer for RequestDeserializer {
|
impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
|
||||||
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
|
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
|
||||||
if self.encoding() != Encoding::BINARY {
|
if self.encoding() != Encoding::BINARY {
|
||||||
return Err(message::Error::WrongEncoding {});
|
return Err(message::Error::WrongEncoding {});
|
||||||
|
|
@ -58,7 +57,7 @@ impl BinaryDeserializer for RequestDeserializer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(hv) = self.headers.get("content-type") {
|
if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
|
||||||
visitor = visitor.set_attribute(
|
visitor = visitor.set_attribute(
|
||||||
"datacontenttype",
|
"datacontenttype",
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||||
|
|
@ -66,40 +65,36 @@ impl BinaryDeserializer for RequestDeserializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.body.is_empty() {
|
if !self.body.is_empty() {
|
||||||
visitor.end_with_data(self.body.to_vec())
|
visitor.end_with_data(self.body)
|
||||||
} else {
|
} else {
|
||||||
visitor.end()
|
visitor.end()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StructuredDeserializer for RequestDeserializer {
|
impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
|
||||||
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
|
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
|
||||||
if self.encoding() != Encoding::STRUCTURED {
|
if self.encoding() != Encoding::STRUCTURED {
|
||||||
return Err(message::Error::WrongEncoding {});
|
return Err(message::Error::WrongEncoding {});
|
||||||
}
|
}
|
||||||
visitor.set_structured_event(self.body.to_vec())
|
visitor.set_structured_event(self.body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageDeserializer for RequestDeserializer {
|
impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
|
||||||
fn encoding(&self) -> Encoding {
|
fn encoding(&self) -> Encoding {
|
||||||
if self
|
if self
|
||||||
.headers
|
.headers
|
||||||
.get("content-type")
|
.get(http::header::CONTENT_TYPE)
|
||||||
.map(|v| v.to_str().unwrap_or(""))
|
.and_then(|v| v.to_str().ok())
|
||||||
.unwrap_or("")
|
.filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
|
||||||
== "application/cloudevents+json"
|
.is_some()
|
||||||
{
|
{
|
||||||
Encoding::STRUCTURED
|
Encoding::STRUCTURED
|
||||||
} else if self.headers.contains_key(SPEC_VERSION_HEADER) {
|
} else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
|
||||||
Encoding::BINARY
|
Encoding::BINARY
|
||||||
} else {
|
} else {
|
||||||
Encoding::UNKNOWN
|
Encoding::UNKNOWN
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result<Event, Error> {
|
|
||||||
MessageDeserializer::into_event(RequestDeserializer::new(req, bytes))
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
|
||||||
|
|
||||||
|
/// Any http library should be able to use the
|
||||||
|
/// [`to_event`](super::to_event) function with an implementation of
|
||||||
|
/// this trait.
|
||||||
|
pub trait Headers<'a> {
|
||||||
|
type Iterator: Iterator<Item = (&'a HeaderName, &'a HeaderValue)>;
|
||||||
|
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue>;
|
||||||
|
fn iter(&'a self) -> Self::Iterator;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Implemention for the HeaderMap used by warp/reqwest
|
||||||
|
impl<'a> Headers<'a> for HeaderMap<HeaderValue> {
|
||||||
|
type Iterator = http::header::Iter<'a, HeaderValue>;
|
||||||
|
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue> {
|
||||||
|
self.get(name)
|
||||||
|
}
|
||||||
|
fn iter(&'a self) -> Self::Iterator {
|
||||||
|
self.iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,23 @@
|
||||||
|
mod deserializer;
|
||||||
|
mod headers;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
message::{Error, MessageDeserializer},
|
||||||
|
Event,
|
||||||
|
};
|
||||||
|
use deserializer::Deserializer;
|
||||||
|
pub use headers::Headers;
|
||||||
|
|
||||||
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
|
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
|
||||||
|
|
||||||
|
/// Turn a pile of HTTP headers and a body into a CloudEvent
|
||||||
|
pub fn to_event<'a, T: Headers<'a>>(
|
||||||
|
headers: &'a T,
|
||||||
|
body: Vec<u8>,
|
||||||
|
) -> std::result::Result<Event, Error> {
|
||||||
|
MessageDeserializer::into_event(Deserializer::new(headers, body))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn header_prefix(name: &str) -> String {
|
pub fn header_prefix(name: &str) -> String {
|
||||||
super::header_prefix("ce-", name)
|
super::header_prefix("ce-", name)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,108 +1,10 @@
|
||||||
use reqwest_lib as reqwest;
|
use reqwest_lib as reqwest;
|
||||||
|
|
||||||
use crate::binding::http::SPEC_VERSION_HEADER;
|
use crate::binding::http;
|
||||||
use crate::event::SpecVersion;
|
use crate::message::{Error, Result};
|
||||||
use crate::message::{
|
use crate::Event;
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
|
||||||
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
|
||||||
};
|
|
||||||
use crate::{header_value_to_str, message, Event};
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
|
||||||
use reqwest::header::HeaderMap;
|
|
||||||
use reqwest::Response;
|
use reqwest::Response;
|
||||||
use std::convert::TryFrom;
|
|
||||||
|
|
||||||
/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait.
|
|
||||||
pub struct ResponseDeserializer {
|
|
||||||
headers: HeaderMap,
|
|
||||||
body: Bytes,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ResponseDeserializer {
|
|
||||||
pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer {
|
|
||||||
ResponseDeserializer { headers, body }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BinaryDeserializer for ResponseDeserializer {
|
|
||||||
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
|
|
||||||
if self.encoding() != Encoding::BINARY {
|
|
||||||
return Err(message::Error::WrongEncoding {});
|
|
||||||
}
|
|
||||||
|
|
||||||
let spec_version = SpecVersion::try_from(
|
|
||||||
self.headers
|
|
||||||
.get(SPEC_VERSION_HEADER)
|
|
||||||
.map(|a| header_value_to_str!(a))
|
|
||||||
.unwrap()?,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
|
||||||
|
|
||||||
let attributes = spec_version.attribute_names();
|
|
||||||
|
|
||||||
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
|
|
||||||
let key = hn.as_str();
|
|
||||||
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
|
||||||
}) {
|
|
||||||
let name = &hn.as_str()["ce-".len()..];
|
|
||||||
|
|
||||||
if attributes.contains(&name) {
|
|
||||||
visitor = visitor.set_attribute(
|
|
||||||
name,
|
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
|
||||||
)?
|
|
||||||
} else {
|
|
||||||
visitor = visitor.set_extension(
|
|
||||||
name,
|
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
|
||||||
)?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(hv) = self.headers.get("content-type") {
|
|
||||||
visitor = visitor.set_attribute(
|
|
||||||
"datacontenttype",
|
|
||||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
|
||||||
)?
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.body.is_empty() {
|
|
||||||
visitor.end_with_data(self.body.to_vec())
|
|
||||||
} else {
|
|
||||||
visitor.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StructuredDeserializer for ResponseDeserializer {
|
|
||||||
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
|
|
||||||
if self.encoding() != Encoding::STRUCTURED {
|
|
||||||
return Err(message::Error::WrongEncoding {});
|
|
||||||
}
|
|
||||||
visitor.set_structured_event(self.body.to_vec())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MessageDeserializer for ResponseDeserializer {
|
|
||||||
fn encoding(&self) -> Encoding {
|
|
||||||
match (
|
|
||||||
#[allow(clippy::borrow_interior_mutable_const)]
|
|
||||||
self.headers
|
|
||||||
.get(reqwest::header::CONTENT_TYPE)
|
|
||||||
.map(|a| header_value_to_str!(a))
|
|
||||||
.map(|r| r.ok())
|
|
||||||
.flatten()
|
|
||||||
.map(|e| e.starts_with("application/cloudevents+json")),
|
|
||||||
self.headers.get(SPEC_VERSION_HEADER),
|
|
||||||
) {
|
|
||||||
(Some(true), _) => Encoding::STRUCTURED,
|
|
||||||
(_, Some(_)) => Encoding::BINARY,
|
|
||||||
_ => Encoding::UNKNOWN,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Method to transform an incoming [`Response`] to [`Event`].
|
/// Method to transform an incoming [`Response`] to [`Event`].
|
||||||
pub async fn response_to_event(res: Response) -> Result<Event> {
|
pub async fn response_to_event(res: Response) -> Result<Event> {
|
||||||
|
|
@ -110,8 +12,7 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
|
||||||
let b = res.bytes().await.map_err(|e| Error::Other {
|
let b = res.bytes().await.map_err(|e| Error::Other {
|
||||||
source: Box::new(e),
|
source: Box::new(e),
|
||||||
})?;
|
})?;
|
||||||
|
http::to_event(&h, b.to_vec())
|
||||||
MessageDeserializer::into_event(ResponseDeserializer::new(h, b))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
|
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
|
||||||
|
|
|
||||||
|
|
@ -37,5 +37,4 @@ pub use client_request::event_to_request;
|
||||||
pub use client_request::RequestBuilderExt;
|
pub use client_request::RequestBuilderExt;
|
||||||
pub use client_request::RequestSerializer;
|
pub use client_request::RequestSerializer;
|
||||||
pub use client_response::response_to_event;
|
pub use client_response::response_to_event;
|
||||||
pub use client_response::ResponseDeserializer;
|
|
||||||
pub use client_response::ResponseExt;
|
pub use client_response::ResponseExt;
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use warp_lib as warp;
|
use warp_lib as warp;
|
||||||
|
|
||||||
use super::server_request::request_to_event;
|
use crate::binding::http;
|
||||||
|
|
||||||
use crate::Event;
|
use crate::Event;
|
||||||
use warp::http::HeaderMap;
|
use warp::http::HeaderMap;
|
||||||
|
|
@ -38,7 +38,7 @@ pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
|
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
|
||||||
request_to_event(headers, body)
|
http::to_event(&headers, body.to_vec())
|
||||||
.map_err(|error| warp::reject::custom(EventFilterError { error }))
|
.map_err(|error| warp::reject::custom(EventFilterError { error }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,6 @@
|
||||||
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
mod server_request;
|
|
||||||
mod server_response;
|
mod server_response;
|
||||||
|
|
||||||
pub mod filter;
|
pub mod filter;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue