Refactor redundant header logic to a shared lib
Fixes #145 Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
parent
2cae3f0b36
commit
d18985eea9
|
|
@ -1,68 +1,9 @@
|
||||||
use crate::event::SpecVersion;
|
|
||||||
use actix_web::http::header;
|
|
||||||
use actix_web::http::{HeaderName, HeaderValue};
|
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
macro_rules! unwrap_optional_header {
|
|
||||||
($headers:expr, $name:expr) => {
|
|
||||||
$headers
|
|
||||||
.get::<&'static HeaderName>(&$name)
|
|
||||||
.map(|a| header_value_to_str!(a))
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! header_value_to_str {
|
|
||||||
($header_value:expr) => {
|
|
||||||
$header_value
|
|
||||||
.to_str()
|
|
||||||
.map_err(|e| crate::message::Error::Other {
|
|
||||||
source: Box::new(e),
|
|
||||||
})
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! str_to_header_value {
|
macro_rules! str_to_header_value {
|
||||||
($header_value:expr) => {
|
($header_value:expr) => {
|
||||||
HeaderValue::from_str($header_value).map_err(|e| crate::message::Error::Other {
|
HeaderValue::from_str($header_value.to_string().as_str()).map_err(|e| {
|
||||||
source: Box::new(e),
|
crate::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! str_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
|
|
||||||
source: Box::new(e),
|
|
||||||
})
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! attribute_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
str_name_to_header!(&["ce-", $attribute].concat())
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
fn attributes_to_headers(
|
|
||||||
it: impl Iterator<Item = &'static str>,
|
|
||||||
) -> HashMap<&'static str, HeaderName> {
|
|
||||||
it.map(|s| {
|
|
||||||
if s == "datacontenttype" {
|
|
||||||
(s, header::CONTENT_TYPE)
|
|
||||||
} else {
|
|
||||||
(s, attribute_name_to_header!(s).unwrap())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
|
|
||||||
attributes_to_headers(SpecVersion::all_attribute_names());
|
|
||||||
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
|
|
||||||
HeaderName::from_static("ce-specversion");
|
|
||||||
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
|
|
||||||
HeaderValue::from_static("application/cloudevents+json");
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
use super::headers;
|
use crate::binding::http::SPEC_VERSION_HEADER;
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
||||||
Result, StructuredDeserializer, StructuredSerializer,
|
Result, StructuredDeserializer, StructuredSerializer,
|
||||||
};
|
};
|
||||||
use crate::{message, Event};
|
use crate::{header_value_to_str, message, Event};
|
||||||
use actix_web::http::HeaderName;
|
|
||||||
use actix_web::web::{Bytes, BytesMut};
|
use actix_web::web::{Bytes, BytesMut};
|
||||||
use actix_web::{web, HttpMessage, HttpRequest};
|
use actix_web::{web, HttpMessage, HttpRequest};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
@ -32,18 +31,21 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let spec_version = SpecVersion::try_from(
|
let spec_version = SpecVersion::try_from(
|
||||||
unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
|
self.req
|
||||||
|
.headers()
|
||||||
|
.get(SPEC_VERSION_HEADER)
|
||||||
|
.map(|a| header_value_to_str!(a))
|
||||||
|
.unwrap()?,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||||
|
|
||||||
let attributes = spec_version.attribute_names();
|
let attributes = spec_version.attribute_names();
|
||||||
|
|
||||||
for (hn, hv) in
|
for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| {
|
||||||
self.req.headers().iter().filter(|(hn, _)| {
|
let key = hn.as_str();
|
||||||
headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
|
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
||||||
})
|
}) {
|
||||||
{
|
|
||||||
let name = &hn.as_str()["ce-".len()..];
|
let name = &hn.as_str()["ce-".len()..];
|
||||||
|
|
||||||
if attributes.contains(&name) {
|
if attributes.contains(&name) {
|
||||||
|
|
@ -87,17 +89,50 @@ impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
|
||||||
fn encoding(&self) -> Encoding {
|
fn encoding(&self) -> Encoding {
|
||||||
if self.req.content_type() == "application/cloudevents+json" {
|
if self.req.content_type() == "application/cloudevents+json" {
|
||||||
Encoding::STRUCTURED
|
Encoding::STRUCTURED
|
||||||
} else if self
|
} else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() {
|
||||||
.req
|
|
||||||
.headers()
|
|
||||||
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
|
|
||||||
.is_some()
|
|
||||||
{
|
|
||||||
Encoding::BINARY
|
Encoding::BINARY
|
||||||
} else {
|
} else {
|
||||||
Encoding::UNKNOWN
|
Encoding::UNKNOWN
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn into_event(self) -> Result<Event> {
|
||||||
|
match self.encoding() {
|
||||||
|
Encoding::BINARY => BinaryDeserializer::into_event(self),
|
||||||
|
Encoding::STRUCTURED => StructuredDeserializer::into_event(self),
|
||||||
|
_ => Err(message::Error::WrongEncoding {}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_to_binary<R: Sized, T: BinarySerializer<R>>(self, serializer: T) -> Result<R> {
|
||||||
|
if self.encoding() == Encoding::BINARY {
|
||||||
|
return self.deserialize_binary(serializer);
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageDeserializer::into_event(self)?.deserialize_binary(serializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_to_structured<R: Sized, T: StructuredSerializer<R>>(
|
||||||
|
self,
|
||||||
|
serializer: T,
|
||||||
|
) -> Result<R> {
|
||||||
|
if self.encoding() == Encoding::STRUCTURED {
|
||||||
|
return self.deserialize_structured(serializer);
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageDeserializer::into_event(self)?.deserialize_structured(serializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_to<R: Sized, T: BinarySerializer<R> + StructuredSerializer<R>>(
|
||||||
|
self,
|
||||||
|
serializer: T,
|
||||||
|
) -> Result<R> {
|
||||||
|
if self.encoding() == Encoding::STRUCTURED {
|
||||||
|
self.deserialize_structured(serializer)
|
||||||
|
} else {
|
||||||
|
self.deserialize_binary(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Method to transform an incoming [`HttpRequest`] to [`Event`].
|
/// Method to transform an incoming [`HttpRequest`] to [`Event`].
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,19 @@
|
||||||
use super::headers;
|
use crate::binding::{
|
||||||
|
attribute_header,
|
||||||
|
http::{PREFIX, SPEC_VERSION_HEADER},
|
||||||
|
CLOUDEVENTS_JSON_HEADER,
|
||||||
|
};
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
|
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
|
||||||
};
|
};
|
||||||
use crate::Event;
|
use crate::Event;
|
||||||
use actix_web::dev::HttpResponseBuilder;
|
use actix_web::dev::HttpResponseBuilder;
|
||||||
use actix_web::http::{HeaderName, HeaderValue, StatusCode};
|
use actix_web::http::{HeaderValue, StatusCode};
|
||||||
use actix_web::HttpResponse;
|
use actix_web::HttpResponse;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::future::LocalBoxFuture;
|
use futures::future::LocalBoxFuture;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
|
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
|
||||||
pub struct HttpResponseSerializer {
|
pub struct HttpResponseSerializer {
|
||||||
|
|
@ -25,25 +28,23 @@ impl HttpResponseSerializer {
|
||||||
|
|
||||||
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
|
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
|
||||||
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
||||||
self.builder.set_header(
|
self.builder
|
||||||
headers::SPEC_VERSION_HEADER.clone(),
|
.set_header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
|
||||||
str_to_header_value!(spec_version.as_str())?,
|
|
||||||
);
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.builder.set_header(
|
self.builder.set_header(
|
||||||
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
|
&attribute_header(PREFIX, name),
|
||||||
str_to_header_value!(value.to_string().as_str())?,
|
str_to_header_value!(value)?,
|
||||||
);
|
);
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.builder.set_header(
|
self.builder.set_header(
|
||||||
attribute_name_to_header!(name)?,
|
&attribute_header(PREFIX, name),
|
||||||
str_to_header_value!(value.to_string().as_str())?,
|
str_to_header_value!(value)?,
|
||||||
);
|
);
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
@ -63,7 +64,7 @@ impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
|
||||||
.builder
|
.builder
|
||||||
.set_header(
|
.set_header(
|
||||||
actix_web::http::header::CONTENT_TYPE,
|
actix_web::http::header::CONTENT_TYPE,
|
||||||
headers::CLOUDEVENTS_JSON_HEADER.clone(),
|
CLOUDEVENTS_JSON_HEADER,
|
||||||
)
|
)
|
||||||
.body(bytes))
|
.body(bytes))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,3 +8,36 @@ pub mod rdkafka;
|
||||||
pub mod reqwest;
|
pub mod reqwest;
|
||||||
#[cfg(feature = "warp")]
|
#[cfg(feature = "warp")]
|
||||||
pub mod warp;
|
pub mod warp;
|
||||||
|
|
||||||
|
#[cfg(feature = "rdkafka")]
|
||||||
|
pub(crate) mod kafka {
|
||||||
|
pub static SPEC_VERSION_HEADER: &str = "ce_specversion";
|
||||||
|
pub static PREFIX: &str = "ce_";
|
||||||
|
}
|
||||||
|
#[cfg(any(feature = "actix", feature = "warp", feature = "reqwest"))]
|
||||||
|
pub(crate) mod http {
|
||||||
|
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
|
||||||
|
pub static PREFIX: &str = "ce-";
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
|
||||||
|
pub(crate) static CONTENT_TYPE: &str = "content-type";
|
||||||
|
|
||||||
|
pub(crate) fn attribute_header(prefix: &str, name: &str) -> String {
|
||||||
|
if name == "datacontenttype" {
|
||||||
|
CONTENT_TYPE.to_string()
|
||||||
|
} else {
|
||||||
|
[prefix, name].concat()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! header_value_to_str {
|
||||||
|
($header_value:expr) => {
|
||||||
|
$header_value
|
||||||
|
.to_str()
|
||||||
|
.map_err(|e| crate::message::Error::Other {
|
||||||
|
source: Box::new(e),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,29 +0,0 @@
|
||||||
use crate::event::SpecVersion;
|
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
macro_rules! attribute_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
format!("ce_{}", $attribute)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
|
|
||||||
it.map(|s| {
|
|
||||||
if s == "datacontenttype" {
|
|
||||||
(s, String::from("content-type"))
|
|
||||||
} else {
|
|
||||||
(s, attribute_name_to_header!(s))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> =
|
|
||||||
attributes_to_headers(SpecVersion::all_attribute_names());
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion";
|
|
||||||
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
|
|
||||||
pub(crate) static CONTENT_TYPE: &str = "content-type";
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use rdkafka_lib as rdkafka;
|
use rdkafka_lib as rdkafka;
|
||||||
|
|
||||||
use super::headers;
|
use crate::binding::{kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE};
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
||||||
|
|
@ -47,17 +47,18 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let spec_version = SpecVersion::try_from(
|
let spec_version = SpecVersion::try_from(
|
||||||
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
|
str::from_utf8(&self.headers.remove(SPEC_VERSION_HEADER).unwrap()).map_err(|e| {
|
||||||
.map_err(|e| crate::message::Error::Other {
|
crate::message::Error::Other {
|
||||||
source: Box::new(e),
|
source: Box::new(e),
|
||||||
})?,
|
}
|
||||||
|
})?,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||||
|
|
||||||
let attributes = spec_version.attribute_names();
|
let attributes = spec_version.attribute_names();
|
||||||
|
|
||||||
if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
|
if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
|
||||||
visitor = visitor.set_attribute(
|
visitor = visitor.set_attribute(
|
||||||
"datacontenttype",
|
"datacontenttype",
|
||||||
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
|
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
|
||||||
|
|
@ -71,7 +72,7 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
|
||||||
for (hn, hv) in self
|
for (hn, hv) in self
|
||||||
.headers
|
.headers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
|
.filter(|(hn, _)| SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
|
||||||
{
|
{
|
||||||
let name = &hn["ce_".len()..];
|
let name = &hn["ce_".len()..];
|
||||||
|
|
||||||
|
|
@ -120,9 +121,9 @@ impl MessageDeserializer for ConsumerRecordDeserializer {
|
||||||
.get("content-type")
|
.get("content-type")
|
||||||
.map(|s| String::from_utf8(s.to_vec()).ok())
|
.map(|s| String::from_utf8(s.to_vec()).ok())
|
||||||
.flatten()
|
.flatten()
|
||||||
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
|
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
|
||||||
.unwrap_or(false),
|
.unwrap_or(false),
|
||||||
self.headers.get(headers::SPEC_VERSION_HEADER),
|
self.headers.get(SPEC_VERSION_HEADER),
|
||||||
) {
|
) {
|
||||||
(true, _) => Encoding::STRUCTURED,
|
(true, _) => Encoding::STRUCTURED,
|
||||||
(_, Some(_)) => Encoding::BINARY,
|
(_, Some(_)) => Encoding::BINARY,
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,9 @@
|
||||||
use rdkafka_lib as rdkafka;
|
use rdkafka_lib as rdkafka;
|
||||||
|
|
||||||
use super::headers;
|
use crate::binding::kafka::PREFIX;
|
||||||
|
use crate::binding::{
|
||||||
|
attribute_header, kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE,
|
||||||
|
};
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
|
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
|
||||||
|
|
@ -43,22 +46,19 @@ impl Default for MessageRecord {
|
||||||
|
|
||||||
impl BinarySerializer<MessageRecord> for MessageRecord {
|
impl BinarySerializer<MessageRecord> for MessageRecord {
|
||||||
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
||||||
self.headers = self
|
self.headers = self.headers.add(SPEC_VERSION_HEADER, spec_version.as_str());
|
||||||
.headers
|
|
||||||
.add(headers::SPEC_VERSION_HEADER, spec_version.as_str());
|
|
||||||
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.headers = self.headers.add(
|
self.headers = self.headers.add(
|
||||||
&headers::ATTRIBUTES_TO_HEADERS
|
&attribute_header(PREFIX, name),
|
||||||
.get(name)
|
// TODO: return Option or Result
|
||||||
.ok_or(crate::message::Error::UnknownAttribute {
|
// .ok_or(crate::message::Error::UnknownAttribute {
|
||||||
name: String::from(name),
|
// name: String::from(name),
|
||||||
})?
|
// })?
|
||||||
.clone()[..],
|
&value.to_string(),
|
||||||
&value.to_string()[..],
|
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
|
|
@ -67,7 +67,7 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
|
||||||
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.headers = self
|
self.headers = self
|
||||||
.headers
|
.headers
|
||||||
.add(&attribute_name_to_header!(name)[..], &value.to_string()[..]);
|
.add(&attribute_header(PREFIX, name), &value.to_string());
|
||||||
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
@ -85,9 +85,7 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
|
||||||
|
|
||||||
impl StructuredSerializer<MessageRecord> for MessageRecord {
|
impl StructuredSerializer<MessageRecord> for MessageRecord {
|
||||||
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
|
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
|
||||||
self.headers = self
|
self.headers = self.headers.add(CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER);
|
||||||
.headers
|
|
||||||
.add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER);
|
|
||||||
|
|
||||||
self.payload = Some(bytes);
|
self.payload = Some(bytes);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,6 @@
|
||||||
|
|
||||||
#![deny(broken_intra_doc_links)]
|
#![deny(broken_intra_doc_links)]
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
mod headers;
|
|
||||||
mod kafka_consumer_record;
|
mod kafka_consumer_record;
|
||||||
mod kafka_producer_record;
|
mod kafka_producer_record;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,13 @@
|
||||||
use reqwest_lib as reqwest;
|
use reqwest_lib as reqwest;
|
||||||
|
|
||||||
use super::headers;
|
use crate::binding::http::PREFIX;
|
||||||
|
use crate::binding::{attribute_header, http::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER};
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
|
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
|
||||||
};
|
};
|
||||||
use crate::Event;
|
use crate::Event;
|
||||||
use reqwest::RequestBuilder;
|
use reqwest::RequestBuilder;
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
|
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
|
||||||
pub struct RequestSerializer {
|
pub struct RequestSerializer {
|
||||||
|
|
@ -22,24 +22,21 @@ impl RequestSerializer {
|
||||||
|
|
||||||
impl BinarySerializer<RequestBuilder> for RequestSerializer {
|
impl BinarySerializer<RequestBuilder> for RequestSerializer {
|
||||||
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
||||||
self.req = self
|
self.req = self.req.header(SPEC_VERSION_HEADER, spec_version.as_str());
|
||||||
.req
|
|
||||||
.header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str());
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.req = self.req.header(
|
self.req = self
|
||||||
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
|
.req
|
||||||
value.to_string(),
|
.header(&attribute_header(PREFIX, name), value.to_string());
|
||||||
);
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.req = self
|
self.req = self
|
||||||
.req
|
.req
|
||||||
.header(attribute_name_to_header!(name)?, value.to_string());
|
.header(&attribute_header(PREFIX, name), value.to_string());
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -56,10 +53,7 @@ impl StructuredSerializer<RequestBuilder> for RequestSerializer {
|
||||||
fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
|
fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.req
|
.req
|
||||||
.header(
|
.header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
|
||||||
reqwest::header::CONTENT_TYPE,
|
|
||||||
headers::CLOUDEVENTS_JSON_HEADER.clone(),
|
|
||||||
)
|
|
||||||
.body(bytes))
|
.body(bytes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,15 @@
|
||||||
use reqwest_lib as reqwest;
|
use reqwest_lib as reqwest;
|
||||||
|
|
||||||
use super::headers;
|
use crate::binding::http::SPEC_VERSION_HEADER;
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
||||||
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
||||||
};
|
};
|
||||||
use crate::{message, Event};
|
use crate::{header_value_to_str, message, Event};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use reqwest::header::{HeaderMap, HeaderName};
|
use reqwest::header::HeaderMap;
|
||||||
use reqwest::Response;
|
use reqwest::Response;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
|
@ -32,18 +32,20 @@ impl BinaryDeserializer for ResponseDeserializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let spec_version = SpecVersion::try_from(
|
let spec_version = SpecVersion::try_from(
|
||||||
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
|
self.headers
|
||||||
|
.get(SPEC_VERSION_HEADER)
|
||||||
|
.map(|a| header_value_to_str!(a))
|
||||||
|
.unwrap()?,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||||
|
|
||||||
let attributes = spec_version.attribute_names();
|
let attributes = spec_version.attribute_names();
|
||||||
|
|
||||||
for (hn, hv) in self
|
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
|
||||||
.headers
|
let key = hn.as_str();
|
||||||
.iter()
|
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
||||||
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
|
}) {
|
||||||
{
|
|
||||||
let name = &hn.as_str()["ce-".len()..];
|
let name = &hn.as_str()["ce-".len()..];
|
||||||
|
|
||||||
if attributes.contains(&name) {
|
if attributes.contains(&name) {
|
||||||
|
|
@ -87,12 +89,13 @@ impl MessageDeserializer for ResponseDeserializer {
|
||||||
fn encoding(&self) -> Encoding {
|
fn encoding(&self) -> Encoding {
|
||||||
match (
|
match (
|
||||||
#[allow(clippy::borrow_interior_mutable_const)]
|
#[allow(clippy::borrow_interior_mutable_const)]
|
||||||
unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE)
|
self.headers
|
||||||
|
.get(reqwest::header::CONTENT_TYPE)
|
||||||
|
.map(|a| header_value_to_str!(a))
|
||||||
.map(|r| r.ok())
|
.map(|r| r.ok())
|
||||||
.flatten()
|
.flatten()
|
||||||
.map(|e| e.starts_with("application/cloudevents+json")),
|
.map(|e| e.starts_with("application/cloudevents+json")),
|
||||||
self.headers
|
self.headers.get(SPEC_VERSION_HEADER),
|
||||||
.get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER),
|
|
||||||
) {
|
) {
|
||||||
(Some(true), _) => Encoding::STRUCTURED,
|
(Some(true), _) => Encoding::STRUCTURED,
|
||||||
(_, Some(_)) => Encoding::BINARY,
|
(_, Some(_)) => Encoding::BINARY,
|
||||||
|
|
|
||||||
|
|
@ -1,63 +0,0 @@
|
||||||
use reqwest_lib as reqwest;
|
|
||||||
|
|
||||||
use crate::event::SpecVersion;
|
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use reqwest::header::{HeaderName, HeaderValue};
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
macro_rules! unwrap_optional_header {
|
|
||||||
($headers:expr, $name:expr) => {
|
|
||||||
$headers
|
|
||||||
.get::<&'static reqwest::header::HeaderName>(&$name)
|
|
||||||
.map(|a| header_value_to_str!(a))
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! header_value_to_str {
|
|
||||||
($header_value:expr) => {
|
|
||||||
$header_value
|
|
||||||
.to_str()
|
|
||||||
.map_err(|e| crate::message::Error::Other {
|
|
||||||
source: Box::new(e),
|
|
||||||
})
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! str_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
reqwest::header::HeaderName::from_str($attribute).map_err(|e| {
|
|
||||||
crate::message::Error::Other {
|
|
||||||
source: Box::new(e),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! attribute_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
str_name_to_header!(&["ce-", $attribute].concat())
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
fn attributes_to_headers(
|
|
||||||
it: impl Iterator<Item = &'static str>,
|
|
||||||
) -> HashMap<&'static str, HeaderName> {
|
|
||||||
it.map(|s| {
|
|
||||||
if s == "datacontenttype" {
|
|
||||||
(s, reqwest::header::CONTENT_TYPE)
|
|
||||||
} else {
|
|
||||||
(s, attribute_name_to_header!(s).unwrap())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
|
|
||||||
attributes_to_headers(SpecVersion::all_attribute_names());
|
|
||||||
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
|
|
||||||
HeaderName::from_static("ce-specversion");
|
|
||||||
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
|
|
||||||
HeaderValue::from_static("application/cloudevents+json");
|
|
||||||
}
|
|
||||||
|
|
@ -30,8 +30,6 @@
|
||||||
|
|
||||||
#![deny(broken_intra_doc_links)]
|
#![deny(broken_intra_doc_links)]
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
mod headers;
|
|
||||||
mod client_request;
|
mod client_request;
|
||||||
mod client_response;
|
mod client_response;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,63 +0,0 @@
|
||||||
use warp_lib as warp;
|
|
||||||
|
|
||||||
use crate::event::SpecVersion;
|
|
||||||
use http::header::HeaderName;
|
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use warp::http::HeaderValue;
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
macro_rules! unwrap_optional_header {
|
|
||||||
($headers:expr, $name:expr) => {
|
|
||||||
$headers
|
|
||||||
.get::<&'static HeaderName>(&$name)
|
|
||||||
.map(|a| header_value_to_str!(a))
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! header_value_to_str {
|
|
||||||
($header_value:expr) => {
|
|
||||||
$header_value
|
|
||||||
.to_str()
|
|
||||||
.map_err(|e| crate::message::Error::Other {
|
|
||||||
source: Box::new(e),
|
|
||||||
})
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! str_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
|
|
||||||
source: Box::new(e),
|
|
||||||
})
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! attribute_name_to_header {
|
|
||||||
($attribute:expr) => {
|
|
||||||
str_name_to_header!(&["ce-", $attribute].concat())
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
fn attributes_to_headers(
|
|
||||||
it: impl Iterator<Item = &'static str>,
|
|
||||||
) -> HashMap<&'static str, HeaderName> {
|
|
||||||
it.map(|s| {
|
|
||||||
if s == "datacontenttype" {
|
|
||||||
(s, http::header::CONTENT_TYPE)
|
|
||||||
} else {
|
|
||||||
(s, attribute_name_to_header!(s).unwrap())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
|
|
||||||
attributes_to_headers(SpecVersion::all_attribute_names());
|
|
||||||
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
|
|
||||||
HeaderName::from_static("ce-specversion");
|
|
||||||
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
|
|
||||||
HeaderValue::from_static("application/cloudevents+json");
|
|
||||||
}
|
|
||||||
|
|
@ -61,9 +61,6 @@
|
||||||
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
mod headers;
|
|
||||||
|
|
||||||
mod server_request;
|
mod server_request;
|
||||||
mod server_response;
|
mod server_response;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,13 @@
|
||||||
use super::headers;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http::{header::HeaderName, HeaderMap};
|
use http::HeaderMap;
|
||||||
|
|
||||||
|
use crate::binding::http::SPEC_VERSION_HEADER;
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
|
use crate::header_value_to_str;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
||||||
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{message, Event};
|
use crate::{message, Event};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
|
@ -29,18 +29,20 @@ impl BinaryDeserializer for RequestDeserializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let spec_version = SpecVersion::try_from(
|
let spec_version = SpecVersion::try_from(
|
||||||
unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
|
self.headers
|
||||||
|
.get(SPEC_VERSION_HEADER)
|
||||||
|
.map(|a| header_value_to_str!(a))
|
||||||
|
.unwrap()?,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||||
|
|
||||||
let attributes = spec_version.attribute_names();
|
let attributes = spec_version.attribute_names();
|
||||||
|
|
||||||
for (hn, hv) in self
|
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
|
||||||
.headers
|
let key = hn.as_str();
|
||||||
.iter()
|
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
||||||
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
|
}) {
|
||||||
{
|
|
||||||
let name = &hn.as_str()["ce-".len()..];
|
let name = &hn.as_str()["ce-".len()..];
|
||||||
|
|
||||||
if attributes.contains(&name) {
|
if attributes.contains(&name) {
|
||||||
|
|
@ -90,11 +92,7 @@ impl MessageDeserializer for RequestDeserializer {
|
||||||
== "application/cloudevents+json"
|
== "application/cloudevents+json"
|
||||||
{
|
{
|
||||||
Encoding::STRUCTURED
|
Encoding::STRUCTURED
|
||||||
} else if self
|
} else if self.headers.contains_key(SPEC_VERSION_HEADER) {
|
||||||
.headers
|
|
||||||
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
|
|
||||||
.is_some()
|
|
||||||
{
|
|
||||||
Encoding::BINARY
|
Encoding::BINARY
|
||||||
} else {
|
} else {
|
||||||
Encoding::UNKNOWN
|
Encoding::UNKNOWN
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
use warp_lib as warp;
|
use warp_lib as warp;
|
||||||
|
|
||||||
use super::headers;
|
use crate::binding::http::PREFIX;
|
||||||
|
use crate::binding::{attribute_header, http::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER};
|
||||||
use crate::event::SpecVersion;
|
use crate::event::SpecVersion;
|
||||||
use crate::message::{
|
use crate::message::{
|
||||||
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
|
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
|
||||||
|
|
@ -13,10 +13,8 @@ use warp::http::HeaderValue;
|
||||||
use warp::hyper::Body;
|
use warp::hyper::Body;
|
||||||
use warp::reply::Response;
|
use warp::reply::Response;
|
||||||
|
|
||||||
use http::header::HeaderName;
|
|
||||||
use http::response::Builder;
|
use http::response::Builder;
|
||||||
|
use std::convert::TryFrom;
|
||||||
use std::{convert::TryFrom, str::FromStr};
|
|
||||||
|
|
||||||
pub struct ResponseSerializer {
|
pub struct ResponseSerializer {
|
||||||
builder: Builder,
|
builder: Builder,
|
||||||
|
|
@ -33,7 +31,7 @@ impl ResponseSerializer {
|
||||||
impl BinarySerializer<Response> for ResponseSerializer {
|
impl BinarySerializer<Response> for ResponseSerializer {
|
||||||
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
|
||||||
self.builder = self.builder.header(
|
self.builder = self.builder.header(
|
||||||
headers::SPEC_VERSION_HEADER.clone(),
|
SPEC_VERSION_HEADER,
|
||||||
HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| {
|
HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| {
|
||||||
crate::message::Error::Other {
|
crate::message::Error::Other {
|
||||||
source: Box::new(e),
|
source: Box::new(e),
|
||||||
|
|
@ -45,7 +43,7 @@ impl BinarySerializer<Response> for ResponseSerializer {
|
||||||
|
|
||||||
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.builder = self.builder.header(
|
self.builder = self.builder.header(
|
||||||
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
|
&attribute_header(PREFIX, name),
|
||||||
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
|
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
|
||||||
crate::message::Error::Other {
|
crate::message::Error::Other {
|
||||||
source: Box::new(e),
|
source: Box::new(e),
|
||||||
|
|
@ -57,7 +55,7 @@ impl BinarySerializer<Response> for ResponseSerializer {
|
||||||
|
|
||||||
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||||
self.builder = self.builder.header(
|
self.builder = self.builder.header(
|
||||||
attribute_name_to_header!(name)?,
|
&attribute_header(PREFIX, name),
|
||||||
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
|
HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
|
||||||
crate::message::Error::Other {
|
crate::message::Error::Other {
|
||||||
source: Box::new(e),
|
source: Box::new(e),
|
||||||
|
|
@ -87,10 +85,7 @@ impl BinarySerializer<Response> for ResponseSerializer {
|
||||||
impl StructuredSerializer<Response> for ResponseSerializer {
|
impl StructuredSerializer<Response> for ResponseSerializer {
|
||||||
fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
|
fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
|
||||||
self.builder
|
self.builder
|
||||||
.header(
|
.header(http::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
|
||||||
http::header::CONTENT_TYPE,
|
|
||||||
headers::CLOUDEVENTS_JSON_HEADER.clone(),
|
|
||||||
)
|
|
||||||
.body(Body::from(bytes))
|
.body(Body::from(bytes))
|
||||||
.map_err(|e| crate::message::Error::Other {
|
.map_err(|e| crate::message::Error::Other {
|
||||||
source: Box::new(e),
|
source: Box::new(e),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue