diff --git a/src/binding/actix/headers.rs b/src/binding/actix/headers.rs index 12d4067..83eb8a0 100644 --- a/src/binding/actix/headers.rs +++ b/src/binding/actix/headers.rs @@ -1,68 +1,9 @@ -use crate::event::SpecVersion; -use actix_web::http::header; -use actix_web::http::{HeaderName, HeaderValue}; -use lazy_static::lazy_static; -use std::collections::HashMap; -use std::str::FromStr; - -macro_rules! unwrap_optional_header { - ($headers:expr, $name:expr) => { - $headers - .get::<&'static HeaderName>(&$name) - .map(|a| header_value_to_str!(a)) - }; -} - -macro_rules! header_value_to_str { - ($header_value:expr) => { - $header_value - .to_str() - .map_err(|e| crate::message::Error::Other { - source: Box::new(e), - }) - }; -} - macro_rules! str_to_header_value { ($header_value:expr) => { - HeaderValue::from_str($header_value).map_err(|e| crate::message::Error::Other { - source: Box::new(e), + HeaderValue::from_str($header_value.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } }) }; } - -macro_rules! str_name_to_header { - ($attribute:expr) => { - HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - str_name_to_header!(&["ce-", $attribute].concat()) - }; -} - -fn attributes_to_headers( - it: impl Iterator, -) -> HashMap<&'static str, HeaderName> { - it.map(|s| { - if s == "datacontenttype" { - (s, header::CONTENT_TYPE) - } else { - (s, attribute_name_to_header!(s).unwrap()) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = - attributes_to_headers(SpecVersion::all_attribute_names()); - pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = - HeaderName::from_static("ce-specversion"); - pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = - HeaderValue::from_static("application/cloudevents+json"); -} diff --git a/src/binding/actix/server_request.rs b/src/binding/actix/server_request.rs index dc5080b..577bfb2 100644 --- a/src/binding/actix/server_request.rs +++ b/src/binding/actix/server_request.rs @@ -1,11 +1,10 @@ -use super::headers; +use crate::binding::http::SPEC_VERSION_HEADER; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, }; -use crate::{message, Event}; -use actix_web::http::HeaderName; +use crate::{header_value_to_str, message, Event}; use actix_web::web::{Bytes, BytesMut}; use actix_web::{web, HttpMessage, HttpRequest}; use async_trait::async_trait; @@ -32,18 +31,21 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> { } let spec_version = SpecVersion::try_from( - unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?, + self.req + .headers() + .get(SPEC_VERSION_HEADER) + .map(|a| header_value_to_str!(a)) + .unwrap()?, )?; visitor = visitor.set_spec_version(spec_version.clone())?; let attributes = spec_version.attribute_names(); - for (hn, hv) in - self.req.headers().iter().filter(|(hn, _)| { - headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-") - }) - { + for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| { + let key = hn.as_str(); + SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-") + }) { let name = &hn.as_str()["ce-".len()..]; if attributes.contains(&name) { @@ -87,17 +89,50 @@ impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> { fn encoding(&self) -> Encoding { if self.req.content_type() == "application/cloudevents+json" { Encoding::STRUCTURED - } else if self - .req - .headers() - .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) - .is_some() - { + } else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() { Encoding::BINARY } else { Encoding::UNKNOWN } } + + fn into_event(self) -> Result { + match self.encoding() { + Encoding::BINARY => BinaryDeserializer::into_event(self), + Encoding::STRUCTURED => StructuredDeserializer::into_event(self), + _ => Err(message::Error::WrongEncoding {}), + } + } + + fn deserialize_to_binary>(self, serializer: T) -> Result { + if self.encoding() == Encoding::BINARY { + return self.deserialize_binary(serializer); + } + + MessageDeserializer::into_event(self)?.deserialize_binary(serializer) + } + + fn deserialize_to_structured>( + self, + serializer: T, + ) -> Result { + if self.encoding() == Encoding::STRUCTURED { + return self.deserialize_structured(serializer); + } + + MessageDeserializer::into_event(self)?.deserialize_structured(serializer) + } + + fn deserialize_to + StructuredSerializer>( + self, + serializer: T, + ) -> Result { + if self.encoding() == Encoding::STRUCTURED { + self.deserialize_structured(serializer) + } else { + self.deserialize_binary(serializer) + } + } } /// Method to transform an incoming [`HttpRequest`] to [`Event`]. diff --git a/src/binding/actix/server_response.rs b/src/binding/actix/server_response.rs index 5c68561..39217e4 100644 --- a/src/binding/actix/server_response.rs +++ b/src/binding/actix/server_response.rs @@ -1,16 +1,19 @@ -use super::headers; +use crate::binding::{ + attribute_header, + http::{PREFIX, SPEC_VERSION_HEADER}, + CLOUDEVENTS_JSON_HEADER, +}; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, }; use crate::Event; use actix_web::dev::HttpResponseBuilder; -use actix_web::http::{HeaderName, HeaderValue, StatusCode}; +use actix_web::http::{HeaderValue, StatusCode}; use actix_web::HttpResponse; use async_trait::async_trait; use futures::future::LocalBoxFuture; use futures::FutureExt; -use std::str::FromStr; /// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`]. pub struct HttpResponseSerializer { @@ -25,25 +28,23 @@ impl HttpResponseSerializer { impl BinarySerializer for HttpResponseSerializer { fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.builder.set_header( - headers::SPEC_VERSION_HEADER.clone(), - str_to_header_value!(spec_version.as_str())?, - ); + self.builder + .set_header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?); Ok(self) } fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { self.builder.set_header( - headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), - str_to_header_value!(value.to_string().as_str())?, + &attribute_header(PREFIX, name), + str_to_header_value!(value)?, ); Ok(self) } fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { self.builder.set_header( - attribute_name_to_header!(name)?, - str_to_header_value!(value.to_string().as_str())?, + &attribute_header(PREFIX, name), + str_to_header_value!(value)?, ); Ok(self) } @@ -63,7 +64,7 @@ impl StructuredSerializer for HttpResponseSerializer { .builder .set_header( actix_web::http::header::CONTENT_TYPE, - headers::CLOUDEVENTS_JSON_HEADER.clone(), + CLOUDEVENTS_JSON_HEADER, ) .body(bytes)) } diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 1368f15..486907e 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -8,3 +8,36 @@ pub mod rdkafka; pub mod reqwest; #[cfg(feature = "warp")] pub mod warp; + +#[cfg(feature = "rdkafka")] +pub(crate) mod kafka { + pub static SPEC_VERSION_HEADER: &str = "ce_specversion"; + pub static PREFIX: &str = "ce_"; +} +#[cfg(any(feature = "actix", feature = "warp", feature = "reqwest"))] +pub(crate) mod http { + pub static SPEC_VERSION_HEADER: &str = "ce-specversion"; + pub static PREFIX: &str = "ce-"; +} + +pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json"; +pub(crate) static CONTENT_TYPE: &str = "content-type"; + +pub(crate) fn attribute_header(prefix: &str, name: &str) -> String { + if name == "datacontenttype" { + CONTENT_TYPE.to_string() + } else { + [prefix, name].concat() + } +} + +#[macro_export] +macro_rules! header_value_to_str { + ($header_value:expr) => { + $header_value + .to_str() + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} diff --git a/src/binding/rdkafka/headers.rs b/src/binding/rdkafka/headers.rs deleted file mode 100644 index 6f66238..0000000 --- a/src/binding/rdkafka/headers.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::event::SpecVersion; -use lazy_static::lazy_static; -use std::collections::HashMap; - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - format!("ce_{}", $attribute) - }; -} - -fn attributes_to_headers(it: impl Iterator) -> HashMap<&'static str, String> { - it.map(|s| { - if s == "datacontenttype" { - (s, String::from("content-type")) - } else { - (s, attribute_name_to_header!(s)) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> = - attributes_to_headers(SpecVersion::all_attribute_names()); -} - -pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion"; -pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json"; -pub(crate) static CONTENT_TYPE: &str = "content-type"; diff --git a/src/binding/rdkafka/kafka_consumer_record.rs b/src/binding/rdkafka/kafka_consumer_record.rs index f118d19..15310a3 100644 --- a/src/binding/rdkafka/kafka_consumer_record.rs +++ b/src/binding/rdkafka/kafka_consumer_record.rs @@ -1,6 +1,6 @@ use rdkafka_lib as rdkafka; -use super::headers; +use crate::binding::{kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE}; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, @@ -47,17 +47,18 @@ impl BinaryDeserializer for ConsumerRecordDeserializer { } let spec_version = SpecVersion::try_from( - str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..]) - .map_err(|e| crate::message::Error::Other { + str::from_utf8(&self.headers.remove(SPEC_VERSION_HEADER).unwrap()).map_err(|e| { + crate::message::Error::Other { source: Box::new(e), - })?, + } + })?, )?; visitor = visitor.set_spec_version(spec_version.clone())?; let attributes = spec_version.attribute_names(); - if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) { + if let Some(hv) = self.headers.remove(CONTENT_TYPE) { visitor = visitor.set_attribute( "datacontenttype", MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { @@ -71,7 +72,7 @@ impl BinaryDeserializer for ConsumerRecordDeserializer { for (hn, hv) in self .headers .into_iter() - .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) + .filter(|(hn, _)| SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) { let name = &hn["ce_".len()..]; @@ -120,9 +121,9 @@ impl MessageDeserializer for ConsumerRecordDeserializer { .get("content-type") .map(|s| String::from_utf8(s.to_vec()).ok()) .flatten() - .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) + .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) .unwrap_or(false), - self.headers.get(headers::SPEC_VERSION_HEADER), + self.headers.get(SPEC_VERSION_HEADER), ) { (true, _) => Encoding::STRUCTURED, (_, Some(_)) => Encoding::BINARY, diff --git a/src/binding/rdkafka/kafka_producer_record.rs b/src/binding/rdkafka/kafka_producer_record.rs index 3fbb27a..2c68e9b 100644 --- a/src/binding/rdkafka/kafka_producer_record.rs +++ b/src/binding/rdkafka/kafka_producer_record.rs @@ -1,6 +1,9 @@ use rdkafka_lib as rdkafka; -use super::headers; +use crate::binding::kafka::PREFIX; +use crate::binding::{ + attribute_header, kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE, +}; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, @@ -43,22 +46,19 @@ impl Default for MessageRecord { impl BinarySerializer for MessageRecord { fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.headers = self - .headers - .add(headers::SPEC_VERSION_HEADER, spec_version.as_str()); + self.headers = self.headers.add(SPEC_VERSION_HEADER, spec_version.as_str()); Ok(self) } fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { self.headers = self.headers.add( - &headers::ATTRIBUTES_TO_HEADERS - .get(name) - .ok_or(crate::message::Error::UnknownAttribute { - name: String::from(name), - })? - .clone()[..], - &value.to_string()[..], + &attribute_header(PREFIX, name), + // TODO: return Option or Result + // .ok_or(crate::message::Error::UnknownAttribute { + // name: String::from(name), + // })? + &value.to_string(), ); Ok(self) @@ -67,7 +67,7 @@ impl BinarySerializer for MessageRecord { fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { self.headers = self .headers - .add(&attribute_name_to_header!(name)[..], &value.to_string()[..]); + .add(&attribute_header(PREFIX, name), &value.to_string()); Ok(self) } @@ -85,9 +85,7 @@ impl BinarySerializer for MessageRecord { impl StructuredSerializer for MessageRecord { fn set_structured_event(mut self, bytes: Vec) -> Result { - self.headers = self - .headers - .add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER); + self.headers = self.headers.add(CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER); self.payload = Some(bytes); diff --git a/src/binding/rdkafka/mod.rs b/src/binding/rdkafka/mod.rs index 676152c..41ce0ed 100644 --- a/src/binding/rdkafka/mod.rs +++ b/src/binding/rdkafka/mod.rs @@ -51,8 +51,6 @@ #![deny(broken_intra_doc_links)] -#[macro_use] -mod headers; mod kafka_consumer_record; mod kafka_producer_record; diff --git a/src/binding/reqwest/client_request.rs b/src/binding/reqwest/client_request.rs index f8e8d58..e590d69 100644 --- a/src/binding/reqwest/client_request.rs +++ b/src/binding/reqwest/client_request.rs @@ -1,13 +1,13 @@ use reqwest_lib as reqwest; -use super::headers; +use crate::binding::http::PREFIX; +use crate::binding::{attribute_header, http::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER}; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, }; use crate::Event; use reqwest::RequestBuilder; -use std::str::FromStr; /// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits. pub struct RequestSerializer { @@ -22,24 +22,21 @@ impl RequestSerializer { impl BinarySerializer for RequestSerializer { fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - self.req = self - .req - .header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str()); + self.req = self.req.header(SPEC_VERSION_HEADER, spec_version.as_str()); Ok(self) } fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - self.req = self.req.header( - headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), - value.to_string(), - ); + self.req = self + .req + .header(&attribute_header(PREFIX, name), value.to_string()); Ok(self) } fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { self.req = self .req - .header(attribute_name_to_header!(name)?, value.to_string()); + .header(&attribute_header(PREFIX, name), value.to_string()); Ok(self) } @@ -56,10 +53,7 @@ impl StructuredSerializer for RequestSerializer { fn set_structured_event(self, bytes: Vec) -> Result { Ok(self .req - .header( - reqwest::header::CONTENT_TYPE, - headers::CLOUDEVENTS_JSON_HEADER.clone(), - ) + .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER) .body(bytes)) } } diff --git a/src/binding/reqwest/client_response.rs b/src/binding/reqwest/client_response.rs index a4c0702..c0ad9bf 100644 --- a/src/binding/reqwest/client_response.rs +++ b/src/binding/reqwest/client_response.rs @@ -1,15 +1,15 @@ use reqwest_lib as reqwest; -use super::headers; +use crate::binding::http::SPEC_VERSION_HEADER; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, }; -use crate::{message, Event}; +use crate::{header_value_to_str, message, Event}; use async_trait::async_trait; use bytes::Bytes; -use reqwest::header::{HeaderMap, HeaderName}; +use reqwest::header::HeaderMap; use reqwest::Response; use std::convert::TryFrom; @@ -32,18 +32,20 @@ impl BinaryDeserializer for ResponseDeserializer { } let spec_version = SpecVersion::try_from( - unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, + self.headers + .get(SPEC_VERSION_HEADER) + .map(|a| header_value_to_str!(a)) + .unwrap()?, )?; visitor = visitor.set_spec_version(spec_version.clone())?; let attributes = spec_version.attribute_names(); - for (hn, hv) in self - .headers - .iter() - .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) - { + for (hn, hv) in self.headers.iter().filter(|(hn, _)| { + let key = hn.as_str(); + SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-") + }) { let name = &hn.as_str()["ce-".len()..]; if attributes.contains(&name) { @@ -87,12 +89,13 @@ impl MessageDeserializer for ResponseDeserializer { fn encoding(&self) -> Encoding { match ( #[allow(clippy::borrow_interior_mutable_const)] - unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE) + self.headers + .get(reqwest::header::CONTENT_TYPE) + .map(|a| header_value_to_str!(a)) .map(|r| r.ok()) .flatten() .map(|e| e.starts_with("application/cloudevents+json")), - self.headers - .get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER), + self.headers.get(SPEC_VERSION_HEADER), ) { (Some(true), _) => Encoding::STRUCTURED, (_, Some(_)) => Encoding::BINARY, diff --git a/src/binding/reqwest/headers.rs b/src/binding/reqwest/headers.rs deleted file mode 100644 index 7d97f21..0000000 --- a/src/binding/reqwest/headers.rs +++ /dev/null @@ -1,63 +0,0 @@ -use reqwest_lib as reqwest; - -use crate::event::SpecVersion; -use lazy_static::lazy_static; -use reqwest::header::{HeaderName, HeaderValue}; -use std::collections::HashMap; -use std::str::FromStr; - -macro_rules! unwrap_optional_header { - ($headers:expr, $name:expr) => { - $headers - .get::<&'static reqwest::header::HeaderName>(&$name) - .map(|a| header_value_to_str!(a)) - }; -} - -macro_rules! header_value_to_str { - ($header_value:expr) => { - $header_value - .to_str() - .map_err(|e| crate::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! str_name_to_header { - ($attribute:expr) => { - reqwest::header::HeaderName::from_str($attribute).map_err(|e| { - crate::message::Error::Other { - source: Box::new(e), - } - }) - }; -} - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - str_name_to_header!(&["ce-", $attribute].concat()) - }; -} - -fn attributes_to_headers( - it: impl Iterator, -) -> HashMap<&'static str, HeaderName> { - it.map(|s| { - if s == "datacontenttype" { - (s, reqwest::header::CONTENT_TYPE) - } else { - (s, attribute_name_to_header!(s).unwrap()) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = - attributes_to_headers(SpecVersion::all_attribute_names()); - pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = - HeaderName::from_static("ce-specversion"); - pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = - HeaderValue::from_static("application/cloudevents+json"); -} diff --git a/src/binding/reqwest/mod.rs b/src/binding/reqwest/mod.rs index 50844ee..aad3bac 100644 --- a/src/binding/reqwest/mod.rs +++ b/src/binding/reqwest/mod.rs @@ -30,8 +30,6 @@ #![deny(broken_intra_doc_links)] -#[macro_use] -mod headers; mod client_request; mod client_response; diff --git a/src/binding/warp/headers.rs b/src/binding/warp/headers.rs deleted file mode 100644 index 810a336..0000000 --- a/src/binding/warp/headers.rs +++ /dev/null @@ -1,63 +0,0 @@ -use warp_lib as warp; - -use crate::event::SpecVersion; -use http::header::HeaderName; -use lazy_static::lazy_static; -use warp::http::HeaderValue; - -use std::collections::HashMap; -use std::str::FromStr; - -macro_rules! unwrap_optional_header { - ($headers:expr, $name:expr) => { - $headers - .get::<&'static HeaderName>(&$name) - .map(|a| header_value_to_str!(a)) - }; -} - -macro_rules! header_value_to_str { - ($header_value:expr) => { - $header_value - .to_str() - .map_err(|e| crate::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! str_name_to_header { - ($attribute:expr) => { - HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other { - source: Box::new(e), - }) - }; -} - -macro_rules! attribute_name_to_header { - ($attribute:expr) => { - str_name_to_header!(&["ce-", $attribute].concat()) - }; -} - -fn attributes_to_headers( - it: impl Iterator, -) -> HashMap<&'static str, HeaderName> { - it.map(|s| { - if s == "datacontenttype" { - (s, http::header::CONTENT_TYPE) - } else { - (s, attribute_name_to_header!(s).unwrap()) - } - }) - .collect() -} - -lazy_static! { - pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = - attributes_to_headers(SpecVersion::all_attribute_names()); - pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = - HeaderName::from_static("ce-specversion"); - pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = - HeaderValue::from_static("application/cloudevents+json"); -} diff --git a/src/binding/warp/mod.rs b/src/binding/warp/mod.rs index 1c4adec..5e9708c 100644 --- a/src/binding/warp/mod.rs +++ b/src/binding/warp/mod.rs @@ -61,9 +61,6 @@ //! warp::serve(routes).run(([127, 0, 0, 1], 3030)); //! ``` -#[macro_use] -mod headers; - mod server_request; mod server_response; diff --git a/src/binding/warp/server_request.rs b/src/binding/warp/server_request.rs index 2f02b10..694eca7 100644 --- a/src/binding/warp/server_request.rs +++ b/src/binding/warp/server_request.rs @@ -1,13 +1,13 @@ -use super::headers; use bytes::Bytes; -use http::{header::HeaderName, HeaderMap}; +use http::HeaderMap; +use crate::binding::http::SPEC_VERSION_HEADER; use crate::event::SpecVersion; +use crate::header_value_to_str; use crate::message::{ BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, }; - use crate::{message, Event}; use std::convert::TryFrom; @@ -29,18 +29,20 @@ impl BinaryDeserializer for RequestDeserializer { } let spec_version = SpecVersion::try_from( - unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, + self.headers + .get(SPEC_VERSION_HEADER) + .map(|a| header_value_to_str!(a)) + .unwrap()?, )?; visitor = visitor.set_spec_version(spec_version.clone())?; let attributes = spec_version.attribute_names(); - for (hn, hv) in self - .headers - .iter() - .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) - { + for (hn, hv) in self.headers.iter().filter(|(hn, _)| { + let key = hn.as_str(); + SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-") + }) { let name = &hn.as_str()["ce-".len()..]; if attributes.contains(&name) { @@ -90,11 +92,7 @@ impl MessageDeserializer for RequestDeserializer { == "application/cloudevents+json" { Encoding::STRUCTURED - } else if self - .headers - .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) - .is_some() - { + } else if self.headers.contains_key(SPEC_VERSION_HEADER) { Encoding::BINARY } else { Encoding::UNKNOWN diff --git a/src/binding/warp/server_response.rs b/src/binding/warp/server_response.rs index 599277a..030eca3 100644 --- a/src/binding/warp/server_response.rs +++ b/src/binding/warp/server_response.rs @@ -1,7 +1,7 @@ use warp_lib as warp; -use super::headers; - +use crate::binding::http::PREFIX; +use crate::binding::{attribute_header, http::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER}; use crate::event::SpecVersion; use crate::message::{ BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, @@ -13,10 +13,8 @@ use warp::http::HeaderValue; use warp::hyper::Body; use warp::reply::Response; -use http::header::HeaderName; use http::response::Builder; - -use std::{convert::TryFrom, str::FromStr}; +use std::convert::TryFrom; pub struct ResponseSerializer { builder: Builder, @@ -33,7 +31,7 @@ impl ResponseSerializer { impl BinarySerializer for ResponseSerializer { fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { self.builder = self.builder.header( - headers::SPEC_VERSION_HEADER.clone(), + SPEC_VERSION_HEADER, HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| { crate::message::Error::Other { source: Box::new(e), @@ -45,7 +43,7 @@ impl BinarySerializer for ResponseSerializer { fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { self.builder = self.builder.header( - headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), + &attribute_header(PREFIX, name), HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { crate::message::Error::Other { source: Box::new(e), @@ -57,7 +55,7 @@ impl BinarySerializer for ResponseSerializer { fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { self.builder = self.builder.header( - attribute_name_to_header!(name)?, + &attribute_header(PREFIX, name), HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { crate::message::Error::Other { source: Box::new(e), @@ -87,10 +85,7 @@ impl BinarySerializer for ResponseSerializer { impl StructuredSerializer for ResponseSerializer { fn set_structured_event(self, bytes: Vec) -> Result { self.builder - .header( - http::header::CONTENT_TYPE, - headers::CLOUDEVENTS_JSON_HEADER.clone(), - ) + .header(http::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER) .body(Body::from(bytes)) .map_err(|e| crate::message::Error::Other { source: Box::new(e),