Refactor redundant header logic to a shared lib (#146)
* Refactor redundant header logic to a shared lib * Remove some unused code * Share macro between actix and warp, eliminating actix header mod Fixes #145 Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
		
							parent
							
								
									2cae3f0b36
								
							
						
					
					
						commit
						211792f0f4
					
				| 
						 | 
					@ -17,10 +17,10 @@ categories = ["web-programming", "encoding", "data-structures"]
 | 
				
			||||||
name = "cloudevents"
 | 
					name = "cloudevents"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[features]
 | 
					[features]
 | 
				
			||||||
actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
 | 
					actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
 | 
				
			||||||
reqwest = ["reqwest-lib", "async-trait", "lazy_static", "bytes"]
 | 
					reqwest = ["reqwest-lib", "async-trait", "bytes"]
 | 
				
			||||||
rdkafka = ["rdkafka-lib", "lazy_static", "bytes", "futures"]
 | 
					rdkafka = ["rdkafka-lib", "bytes", "futures"]
 | 
				
			||||||
warp = ["warp-lib", "lazy_static", "bytes", "http", "hyper"]
 | 
					warp = ["warp-lib", "bytes", "http", "hyper"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[dependencies]
 | 
					[dependencies]
 | 
				
			||||||
serde = { version = "^1.0", features = ["derive"] }
 | 
					serde = { version = "^1.0", features = ["derive"] }
 | 
				
			||||||
| 
						 | 
					@ -38,7 +38,6 @@ reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls
 | 
				
			||||||
rdkafka-lib = { version = "^0.25", features = ["cmake-build"], optional = true, package = "rdkafka" }
 | 
					rdkafka-lib = { version = "^0.25", features = ["cmake-build"], optional = true, package = "rdkafka" }
 | 
				
			||||||
warp-lib = { version = "^0.3", optional = true, package = "warp" }
 | 
					warp-lib = { version = "^0.3", optional = true, package = "warp" }
 | 
				
			||||||
async-trait = { version = "^0.1.33", optional = true }
 | 
					async-trait = { version = "^0.1.33", optional = true }
 | 
				
			||||||
lazy_static = { version = "1.4.0", optional = true }
 | 
					 | 
				
			||||||
bytes = { version = "^1.0", optional = true }
 | 
					bytes = { version = "^1.0", optional = true }
 | 
				
			||||||
futures = { version = "^0.3", optional = true }
 | 
					futures = { version = "^0.3", optional = true }
 | 
				
			||||||
http = { version = "0.2", optional = true }
 | 
					http = { version = "0.2", optional = true }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,68 +0,0 @@
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					 | 
				
			||||||
use actix_web::http::header;
 | 
					 | 
				
			||||||
use actix_web::http::{HeaderName, HeaderValue};
 | 
					 | 
				
			||||||
use lazy_static::lazy_static;
 | 
					 | 
				
			||||||
use std::collections::HashMap;
 | 
					 | 
				
			||||||
use std::str::FromStr;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! unwrap_optional_header {
 | 
					 | 
				
			||||||
    ($headers:expr, $name:expr) => {
 | 
					 | 
				
			||||||
        $headers
 | 
					 | 
				
			||||||
            .get::<&'static HeaderName>(&$name)
 | 
					 | 
				
			||||||
            .map(|a| header_value_to_str!(a))
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! header_value_to_str {
 | 
					 | 
				
			||||||
    ($header_value:expr) => {
 | 
					 | 
				
			||||||
        $header_value
 | 
					 | 
				
			||||||
            .to_str()
 | 
					 | 
				
			||||||
            .map_err(|e| crate::message::Error::Other {
 | 
					 | 
				
			||||||
                source: Box::new(e),
 | 
					 | 
				
			||||||
            })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! str_to_header_value {
 | 
					 | 
				
			||||||
    ($header_value:expr) => {
 | 
					 | 
				
			||||||
        HeaderValue::from_str($header_value).map_err(|e| crate::message::Error::Other {
 | 
					 | 
				
			||||||
            source: Box::new(e),
 | 
					 | 
				
			||||||
        })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! str_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
 | 
					 | 
				
			||||||
            source: Box::new(e),
 | 
					 | 
				
			||||||
        })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! attribute_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        str_name_to_header!(&["ce-", $attribute].concat())
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn attributes_to_headers(
 | 
					 | 
				
			||||||
    it: impl Iterator<Item = &'static str>,
 | 
					 | 
				
			||||||
) -> HashMap<&'static str, HeaderName> {
 | 
					 | 
				
			||||||
    it.map(|s| {
 | 
					 | 
				
			||||||
        if s == "datacontenttype" {
 | 
					 | 
				
			||||||
            (s, header::CONTENT_TYPE)
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            (s, attribute_name_to_header!(s).unwrap())
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    })
 | 
					 | 
				
			||||||
    .collect()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
lazy_static! {
 | 
					 | 
				
			||||||
    pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
 | 
					 | 
				
			||||||
        attributes_to_headers(SpecVersion::all_attribute_names());
 | 
					 | 
				
			||||||
    pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
 | 
					 | 
				
			||||||
        HeaderName::from_static("ce-specversion");
 | 
					 | 
				
			||||||
    pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
 | 
					 | 
				
			||||||
        HeaderValue::from_static("application/cloudevents+json");
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
| 
						 | 
					@ -41,8 +41,6 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#![deny(broken_intra_doc_links)]
 | 
					#![deny(broken_intra_doc_links)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[macro_use]
 | 
					 | 
				
			||||||
mod headers;
 | 
					 | 
				
			||||||
mod server_request;
 | 
					mod server_request;
 | 
				
			||||||
mod server_response;
 | 
					mod server_response;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,11 +1,10 @@
 | 
				
			||||||
use super::headers;
 | 
					use crate::binding::http::SPEC_VERSION_HEADER;
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
 | 
					    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
 | 
				
			||||||
    Result, StructuredDeserializer, StructuredSerializer,
 | 
					    Result, StructuredDeserializer, StructuredSerializer,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use crate::{message, Event};
 | 
					use crate::{header_value_to_str, message, Event};
 | 
				
			||||||
use actix_web::http::HeaderName;
 | 
					 | 
				
			||||||
use actix_web::web::{Bytes, BytesMut};
 | 
					use actix_web::web::{Bytes, BytesMut};
 | 
				
			||||||
use actix_web::{web, HttpMessage, HttpRequest};
 | 
					use actix_web::{web, HttpMessage, HttpRequest};
 | 
				
			||||||
use async_trait::async_trait;
 | 
					use async_trait::async_trait;
 | 
				
			||||||
| 
						 | 
					@ -32,18 +31,21 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let spec_version = SpecVersion::try_from(
 | 
					        let spec_version = SpecVersion::try_from(
 | 
				
			||||||
            unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
 | 
					            self.req
 | 
				
			||||||
 | 
					                .headers()
 | 
				
			||||||
 | 
					                .get(SPEC_VERSION_HEADER)
 | 
				
			||||||
 | 
					                .map(|a| header_value_to_str!(a))
 | 
				
			||||||
 | 
					                .unwrap()?,
 | 
				
			||||||
        )?;
 | 
					        )?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
					        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let attributes = spec_version.attribute_names();
 | 
					        let attributes = spec_version.attribute_names();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (hn, hv) in
 | 
					        for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| {
 | 
				
			||||||
            self.req.headers().iter().filter(|(hn, _)| {
 | 
					            let key = hn.as_str();
 | 
				
			||||||
                headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
 | 
					            SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
 | 
				
			||||||
            })
 | 
					        }) {
 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            let name = &hn.as_str()["ce-".len()..];
 | 
					            let name = &hn.as_str()["ce-".len()..];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if attributes.contains(&name) {
 | 
					            if attributes.contains(&name) {
 | 
				
			||||||
| 
						 | 
					@ -87,12 +89,7 @@ impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
 | 
				
			||||||
    fn encoding(&self) -> Encoding {
 | 
					    fn encoding(&self) -> Encoding {
 | 
				
			||||||
        if self.req.content_type() == "application/cloudevents+json" {
 | 
					        if self.req.content_type() == "application/cloudevents+json" {
 | 
				
			||||||
            Encoding::STRUCTURED
 | 
					            Encoding::STRUCTURED
 | 
				
			||||||
        } else if self
 | 
					        } else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() {
 | 
				
			||||||
            .req
 | 
					 | 
				
			||||||
            .headers()
 | 
					 | 
				
			||||||
            .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
 | 
					 | 
				
			||||||
            .is_some()
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            Encoding::BINARY
 | 
					            Encoding::BINARY
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            Encoding::UNKNOWN
 | 
					            Encoding::UNKNOWN
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,16 +1,21 @@
 | 
				
			||||||
use super::headers;
 | 
					 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
 | 
					    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use crate::Event;
 | 
					use crate::Event;
 | 
				
			||||||
 | 
					use crate::{
 | 
				
			||||||
 | 
					    binding::{
 | 
				
			||||||
 | 
					        http::{header_prefix, SPEC_VERSION_HEADER},
 | 
				
			||||||
 | 
					        CLOUDEVENTS_JSON_HEADER,
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    str_to_header_value,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
use actix_web::dev::HttpResponseBuilder;
 | 
					use actix_web::dev::HttpResponseBuilder;
 | 
				
			||||||
use actix_web::http::{HeaderName, HeaderValue, StatusCode};
 | 
					use actix_web::http::StatusCode;
 | 
				
			||||||
use actix_web::HttpResponse;
 | 
					use actix_web::HttpResponse;
 | 
				
			||||||
use async_trait::async_trait;
 | 
					use async_trait::async_trait;
 | 
				
			||||||
use futures::future::LocalBoxFuture;
 | 
					use futures::future::LocalBoxFuture;
 | 
				
			||||||
use futures::FutureExt;
 | 
					use futures::FutureExt;
 | 
				
			||||||
use std::str::FromStr;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
 | 
					/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
 | 
				
			||||||
pub struct HttpResponseSerializer {
 | 
					pub struct HttpResponseSerializer {
 | 
				
			||||||
| 
						 | 
					@ -25,26 +30,20 @@ impl HttpResponseSerializer {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
 | 
					impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
 | 
				
			||||||
    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
 | 
					    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
 | 
				
			||||||
        self.builder.set_header(
 | 
					        self.builder
 | 
				
			||||||
            headers::SPEC_VERSION_HEADER.clone(),
 | 
					            .set_header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
 | 
				
			||||||
            str_to_header_value!(spec_version.as_str())?,
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.builder.set_header(
 | 
					        self.builder
 | 
				
			||||||
            headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
 | 
					            .set_header(&header_prefix(name), str_to_header_value!(value)?);
 | 
				
			||||||
            str_to_header_value!(value.to_string().as_str())?,
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.builder.set_header(
 | 
					        self.builder
 | 
				
			||||||
            attribute_name_to_header!(name)?,
 | 
					            .set_header(&header_prefix(name), str_to_header_value!(value)?);
 | 
				
			||||||
            str_to_header_value!(value.to_string().as_str())?,
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -63,7 +62,7 @@ impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
 | 
				
			||||||
            .builder
 | 
					            .builder
 | 
				
			||||||
            .set_header(
 | 
					            .set_header(
 | 
				
			||||||
                actix_web::http::header::CONTENT_TYPE,
 | 
					                actix_web::http::header::CONTENT_TYPE,
 | 
				
			||||||
                headers::CLOUDEVENTS_JSON_HEADER.clone(),
 | 
					                CLOUDEVENTS_JSON_HEADER,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            .body(bytes))
 | 
					            .body(bytes))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -8,3 +8,53 @@ pub mod rdkafka;
 | 
				
			||||||
pub mod reqwest;
 | 
					pub mod reqwest;
 | 
				
			||||||
#[cfg(feature = "warp")]
 | 
					#[cfg(feature = "warp")]
 | 
				
			||||||
pub mod warp;
 | 
					pub mod warp;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(feature = "rdkafka")]
 | 
				
			||||||
 | 
					pub(crate) mod kafka {
 | 
				
			||||||
 | 
					    pub static SPEC_VERSION_HEADER: &str = "ce_specversion";
 | 
				
			||||||
 | 
					    pub fn header_prefix(name: &str) -> String {
 | 
				
			||||||
 | 
					        super::header_prefix("ce_", name)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(any(feature = "actix", feature = "warp", feature = "reqwest"))]
 | 
				
			||||||
 | 
					pub(crate) mod http {
 | 
				
			||||||
 | 
					    pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
 | 
				
			||||||
 | 
					    pub fn header_prefix(name: &str) -> String {
 | 
				
			||||||
 | 
					        super::header_prefix("ce-", name)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(any(feature = "actix", feature = "warp"))]
 | 
				
			||||||
 | 
					#[macro_export]
 | 
				
			||||||
 | 
					macro_rules! str_to_header_value {
 | 
				
			||||||
 | 
					    ($header_value:expr) => {
 | 
				
			||||||
 | 
					        http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
 | 
				
			||||||
 | 
					            crate::message::Error::Other {
 | 
				
			||||||
 | 
					                source: Box::new(e),
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					    };
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
 | 
				
			||||||
 | 
					pub(crate) static CONTENT_TYPE: &str = "content-type";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					fn header_prefix(prefix: &str, name: &str) -> String {
 | 
				
			||||||
 | 
					    if name == "datacontenttype" {
 | 
				
			||||||
 | 
					        CONTENT_TYPE.to_string()
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					        [prefix, name].concat()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[macro_export]
 | 
				
			||||||
 | 
					macro_rules! header_value_to_str {
 | 
				
			||||||
 | 
					    ($header_value:expr) => {
 | 
				
			||||||
 | 
					        $header_value
 | 
				
			||||||
 | 
					            .to_str()
 | 
				
			||||||
 | 
					            .map_err(|e| crate::message::Error::Other {
 | 
				
			||||||
 | 
					                source: Box::new(e),
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					    };
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,29 +0,0 @@
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					 | 
				
			||||||
use lazy_static::lazy_static;
 | 
					 | 
				
			||||||
use std::collections::HashMap;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! attribute_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        format!("ce_{}", $attribute)
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
 | 
					 | 
				
			||||||
    it.map(|s| {
 | 
					 | 
				
			||||||
        if s == "datacontenttype" {
 | 
					 | 
				
			||||||
            (s, String::from("content-type"))
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            (s, attribute_name_to_header!(s))
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    })
 | 
					 | 
				
			||||||
    .collect()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
lazy_static! {
 | 
					 | 
				
			||||||
    pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> =
 | 
					 | 
				
			||||||
        attributes_to_headers(SpecVersion::all_attribute_names());
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion";
 | 
					 | 
				
			||||||
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
 | 
					 | 
				
			||||||
pub(crate) static CONTENT_TYPE: &str = "content-type";
 | 
					 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,6 @@
 | 
				
			||||||
use rdkafka_lib as rdkafka;
 | 
					use rdkafka_lib as rdkafka;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::headers;
 | 
					use crate::binding::{kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE};
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
 | 
					    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
 | 
				
			||||||
| 
						 | 
					@ -47,17 +47,18 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let spec_version = SpecVersion::try_from(
 | 
					        let spec_version = SpecVersion::try_from(
 | 
				
			||||||
            str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
 | 
					            str::from_utf8(&self.headers.remove(SPEC_VERSION_HEADER).unwrap()).map_err(|e| {
 | 
				
			||||||
                .map_err(|e| crate::message::Error::Other {
 | 
					                crate::message::Error::Other {
 | 
				
			||||||
                    source: Box::new(e),
 | 
					                    source: Box::new(e),
 | 
				
			||||||
                })?,
 | 
					                }
 | 
				
			||||||
 | 
					            })?,
 | 
				
			||||||
        )?;
 | 
					        )?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
					        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let attributes = spec_version.attribute_names();
 | 
					        let attributes = spec_version.attribute_names();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
 | 
					        if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
 | 
				
			||||||
            visitor = visitor.set_attribute(
 | 
					            visitor = visitor.set_attribute(
 | 
				
			||||||
                "datacontenttype",
 | 
					                "datacontenttype",
 | 
				
			||||||
                MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
 | 
					                MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
 | 
				
			||||||
| 
						 | 
					@ -71,7 +72,7 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
 | 
				
			||||||
        for (hn, hv) in self
 | 
					        for (hn, hv) in self
 | 
				
			||||||
            .headers
 | 
					            .headers
 | 
				
			||||||
            .into_iter()
 | 
					            .into_iter()
 | 
				
			||||||
            .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
 | 
					            .filter(|(hn, _)| SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            let name = &hn["ce_".len()..];
 | 
					            let name = &hn["ce_".len()..];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -120,9 +121,9 @@ impl MessageDeserializer for ConsumerRecordDeserializer {
 | 
				
			||||||
                .get("content-type")
 | 
					                .get("content-type")
 | 
				
			||||||
                .map(|s| String::from_utf8(s.to_vec()).ok())
 | 
					                .map(|s| String::from_utf8(s.to_vec()).ok())
 | 
				
			||||||
                .flatten()
 | 
					                .flatten()
 | 
				
			||||||
                .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
 | 
					                .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
 | 
				
			||||||
                .unwrap_or(false),
 | 
					                .unwrap_or(false),
 | 
				
			||||||
            self.headers.get(headers::SPEC_VERSION_HEADER),
 | 
					            self.headers.get(SPEC_VERSION_HEADER),
 | 
				
			||||||
        ) {
 | 
					        ) {
 | 
				
			||||||
            (true, _) => Encoding::STRUCTURED,
 | 
					            (true, _) => Encoding::STRUCTURED,
 | 
				
			||||||
            (_, Some(_)) => Encoding::BINARY,
 | 
					            (_, Some(_)) => Encoding::BINARY,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,9 @@
 | 
				
			||||||
use rdkafka_lib as rdkafka;
 | 
					use rdkafka_lib as rdkafka;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::headers;
 | 
					use crate::binding::{
 | 
				
			||||||
 | 
					    kafka::{header_prefix, SPEC_VERSION_HEADER},
 | 
				
			||||||
 | 
					    CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
 | 
					    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
 | 
				
			||||||
| 
						 | 
					@ -42,39 +45,25 @@ impl Default for MessageRecord {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl BinarySerializer<MessageRecord> for MessageRecord {
 | 
					impl BinarySerializer<MessageRecord> for MessageRecord {
 | 
				
			||||||
    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
 | 
					    fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
 | 
				
			||||||
        self.headers = self
 | 
					        self.headers = self.headers.add(SPEC_VERSION_HEADER, &sv.to_string());
 | 
				
			||||||
            .headers
 | 
					 | 
				
			||||||
            .add(headers::SPEC_VERSION_HEADER, spec_version.as_str());
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.headers = self.headers.add(
 | 
					        let key = &header_prefix(name);
 | 
				
			||||||
            &headers::ATTRIBUTES_TO_HEADERS
 | 
					        self.headers = self.headers.add(key, &value.to_string());
 | 
				
			||||||
                .get(name)
 | 
					 | 
				
			||||||
                .ok_or(crate::message::Error::UnknownAttribute {
 | 
					 | 
				
			||||||
                    name: String::from(name),
 | 
					 | 
				
			||||||
                })?
 | 
					 | 
				
			||||||
                .clone()[..],
 | 
					 | 
				
			||||||
            &value.to_string()[..],
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.headers = self
 | 
					        let key = &header_prefix(name);
 | 
				
			||||||
            .headers
 | 
					        self.headers = self.headers.add(key, &value.to_string());
 | 
				
			||||||
            .add(&attribute_name_to_header!(name)[..], &value.to_string()[..]);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
 | 
					    fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
 | 
				
			||||||
        self.payload = Some(bytes);
 | 
					        self.payload = Some(bytes);
 | 
				
			||||||
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -85,9 +74,7 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl StructuredSerializer<MessageRecord> for MessageRecord {
 | 
					impl StructuredSerializer<MessageRecord> for MessageRecord {
 | 
				
			||||||
    fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
 | 
					    fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
 | 
				
			||||||
        self.headers = self
 | 
					        self.headers = self.headers.add(CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER);
 | 
				
			||||||
            .headers
 | 
					 | 
				
			||||||
            .add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.payload = Some(bytes);
 | 
					        self.payload = Some(bytes);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -51,8 +51,6 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#![deny(broken_intra_doc_links)]
 | 
					#![deny(broken_intra_doc_links)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[macro_use]
 | 
					 | 
				
			||||||
mod headers;
 | 
					 | 
				
			||||||
mod kafka_consumer_record;
 | 
					mod kafka_consumer_record;
 | 
				
			||||||
mod kafka_producer_record;
 | 
					mod kafka_producer_record;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,13 +1,15 @@
 | 
				
			||||||
use reqwest_lib as reqwest;
 | 
					use reqwest_lib as reqwest;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::headers;
 | 
					use crate::binding::{
 | 
				
			||||||
 | 
					    http::{header_prefix, SPEC_VERSION_HEADER},
 | 
				
			||||||
 | 
					    CLOUDEVENTS_JSON_HEADER,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
 | 
					    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use crate::Event;
 | 
					use crate::Event;
 | 
				
			||||||
use reqwest::RequestBuilder;
 | 
					use reqwest::RequestBuilder;
 | 
				
			||||||
use std::str::FromStr;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
 | 
					/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
 | 
				
			||||||
pub struct RequestSerializer {
 | 
					pub struct RequestSerializer {
 | 
				
			||||||
| 
						 | 
					@ -21,25 +23,20 @@ impl RequestSerializer {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl BinarySerializer<RequestBuilder> for RequestSerializer {
 | 
					impl BinarySerializer<RequestBuilder> for RequestSerializer {
 | 
				
			||||||
    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
 | 
					    fn set_spec_version(mut self, spec_ver: SpecVersion) -> Result<Self> {
 | 
				
			||||||
        self.req = self
 | 
					        self.req = self.req.header(SPEC_VERSION_HEADER, spec_ver.to_string());
 | 
				
			||||||
            .req
 | 
					 | 
				
			||||||
            .header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str());
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.req = self.req.header(
 | 
					        let key = &header_prefix(name);
 | 
				
			||||||
            headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
 | 
					        self.req = self.req.header(key, value.to_string());
 | 
				
			||||||
            value.to_string(),
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.req = self
 | 
					        let key = &header_prefix(name);
 | 
				
			||||||
            .req
 | 
					        self.req = self.req.header(key, value.to_string());
 | 
				
			||||||
            .header(attribute_name_to_header!(name)?, value.to_string());
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -56,10 +53,7 @@ impl StructuredSerializer<RequestBuilder> for RequestSerializer {
 | 
				
			||||||
    fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
 | 
					    fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
 | 
				
			||||||
        Ok(self
 | 
					        Ok(self
 | 
				
			||||||
            .req
 | 
					            .req
 | 
				
			||||||
            .header(
 | 
					            .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
 | 
				
			||||||
                reqwest::header::CONTENT_TYPE,
 | 
					 | 
				
			||||||
                headers::CLOUDEVENTS_JSON_HEADER.clone(),
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            .body(bytes))
 | 
					            .body(bytes))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,15 +1,15 @@
 | 
				
			||||||
use reqwest_lib as reqwest;
 | 
					use reqwest_lib as reqwest;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::headers;
 | 
					use crate::binding::http::SPEC_VERSION_HEADER;
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
 | 
					    BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
 | 
				
			||||||
    MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
 | 
					    MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use crate::{message, Event};
 | 
					use crate::{header_value_to_str, message, Event};
 | 
				
			||||||
use async_trait::async_trait;
 | 
					use async_trait::async_trait;
 | 
				
			||||||
use bytes::Bytes;
 | 
					use bytes::Bytes;
 | 
				
			||||||
use reqwest::header::{HeaderMap, HeaderName};
 | 
					use reqwest::header::HeaderMap;
 | 
				
			||||||
use reqwest::Response;
 | 
					use reqwest::Response;
 | 
				
			||||||
use std::convert::TryFrom;
 | 
					use std::convert::TryFrom;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -32,18 +32,20 @@ impl BinaryDeserializer for ResponseDeserializer {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let spec_version = SpecVersion::try_from(
 | 
					        let spec_version = SpecVersion::try_from(
 | 
				
			||||||
            unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
 | 
					            self.headers
 | 
				
			||||||
 | 
					                .get(SPEC_VERSION_HEADER)
 | 
				
			||||||
 | 
					                .map(|a| header_value_to_str!(a))
 | 
				
			||||||
 | 
					                .unwrap()?,
 | 
				
			||||||
        )?;
 | 
					        )?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
					        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let attributes = spec_version.attribute_names();
 | 
					        let attributes = spec_version.attribute_names();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (hn, hv) in self
 | 
					        for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
 | 
				
			||||||
            .headers
 | 
					            let key = hn.as_str();
 | 
				
			||||||
            .iter()
 | 
					            SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
 | 
				
			||||||
            .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
 | 
					        }) {
 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            let name = &hn.as_str()["ce-".len()..];
 | 
					            let name = &hn.as_str()["ce-".len()..];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if attributes.contains(&name) {
 | 
					            if attributes.contains(&name) {
 | 
				
			||||||
| 
						 | 
					@ -87,12 +89,13 @@ impl MessageDeserializer for ResponseDeserializer {
 | 
				
			||||||
    fn encoding(&self) -> Encoding {
 | 
					    fn encoding(&self) -> Encoding {
 | 
				
			||||||
        match (
 | 
					        match (
 | 
				
			||||||
            #[allow(clippy::borrow_interior_mutable_const)]
 | 
					            #[allow(clippy::borrow_interior_mutable_const)]
 | 
				
			||||||
            unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE)
 | 
					            self.headers
 | 
				
			||||||
 | 
					                .get(reqwest::header::CONTENT_TYPE)
 | 
				
			||||||
 | 
					                .map(|a| header_value_to_str!(a))
 | 
				
			||||||
                .map(|r| r.ok())
 | 
					                .map(|r| r.ok())
 | 
				
			||||||
                .flatten()
 | 
					                .flatten()
 | 
				
			||||||
                .map(|e| e.starts_with("application/cloudevents+json")),
 | 
					                .map(|e| e.starts_with("application/cloudevents+json")),
 | 
				
			||||||
            self.headers
 | 
					            self.headers.get(SPEC_VERSION_HEADER),
 | 
				
			||||||
                .get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER),
 | 
					 | 
				
			||||||
        ) {
 | 
					        ) {
 | 
				
			||||||
            (Some(true), _) => Encoding::STRUCTURED,
 | 
					            (Some(true), _) => Encoding::STRUCTURED,
 | 
				
			||||||
            (_, Some(_)) => Encoding::BINARY,
 | 
					            (_, Some(_)) => Encoding::BINARY,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,63 +0,0 @@
 | 
				
			||||||
use reqwest_lib as reqwest;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					 | 
				
			||||||
use lazy_static::lazy_static;
 | 
					 | 
				
			||||||
use reqwest::header::{HeaderName, HeaderValue};
 | 
					 | 
				
			||||||
use std::collections::HashMap;
 | 
					 | 
				
			||||||
use std::str::FromStr;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! unwrap_optional_header {
 | 
					 | 
				
			||||||
    ($headers:expr, $name:expr) => {
 | 
					 | 
				
			||||||
        $headers
 | 
					 | 
				
			||||||
            .get::<&'static reqwest::header::HeaderName>(&$name)
 | 
					 | 
				
			||||||
            .map(|a| header_value_to_str!(a))
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! header_value_to_str {
 | 
					 | 
				
			||||||
    ($header_value:expr) => {
 | 
					 | 
				
			||||||
        $header_value
 | 
					 | 
				
			||||||
            .to_str()
 | 
					 | 
				
			||||||
            .map_err(|e| crate::message::Error::Other {
 | 
					 | 
				
			||||||
                source: Box::new(e),
 | 
					 | 
				
			||||||
            })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! str_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        reqwest::header::HeaderName::from_str($attribute).map_err(|e| {
 | 
					 | 
				
			||||||
            crate::message::Error::Other {
 | 
					 | 
				
			||||||
                source: Box::new(e),
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! attribute_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        str_name_to_header!(&["ce-", $attribute].concat())
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn attributes_to_headers(
 | 
					 | 
				
			||||||
    it: impl Iterator<Item = &'static str>,
 | 
					 | 
				
			||||||
) -> HashMap<&'static str, HeaderName> {
 | 
					 | 
				
			||||||
    it.map(|s| {
 | 
					 | 
				
			||||||
        if s == "datacontenttype" {
 | 
					 | 
				
			||||||
            (s, reqwest::header::CONTENT_TYPE)
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            (s, attribute_name_to_header!(s).unwrap())
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    })
 | 
					 | 
				
			||||||
    .collect()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
lazy_static! {
 | 
					 | 
				
			||||||
    pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
 | 
					 | 
				
			||||||
        attributes_to_headers(SpecVersion::all_attribute_names());
 | 
					 | 
				
			||||||
    pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
 | 
					 | 
				
			||||||
        HeaderName::from_static("ce-specversion");
 | 
					 | 
				
			||||||
    pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
 | 
					 | 
				
			||||||
        HeaderValue::from_static("application/cloudevents+json");
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
| 
						 | 
					@ -30,8 +30,6 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#![deny(broken_intra_doc_links)]
 | 
					#![deny(broken_intra_doc_links)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[macro_use]
 | 
					 | 
				
			||||||
mod headers;
 | 
					 | 
				
			||||||
mod client_request;
 | 
					mod client_request;
 | 
				
			||||||
mod client_response;
 | 
					mod client_response;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,63 +0,0 @@
 | 
				
			||||||
use warp_lib as warp;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					 | 
				
			||||||
use http::header::HeaderName;
 | 
					 | 
				
			||||||
use lazy_static::lazy_static;
 | 
					 | 
				
			||||||
use warp::http::HeaderValue;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
use std::collections::HashMap;
 | 
					 | 
				
			||||||
use std::str::FromStr;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! unwrap_optional_header {
 | 
					 | 
				
			||||||
    ($headers:expr, $name:expr) => {
 | 
					 | 
				
			||||||
        $headers
 | 
					 | 
				
			||||||
            .get::<&'static HeaderName>(&$name)
 | 
					 | 
				
			||||||
            .map(|a| header_value_to_str!(a))
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! header_value_to_str {
 | 
					 | 
				
			||||||
    ($header_value:expr) => {
 | 
					 | 
				
			||||||
        $header_value
 | 
					 | 
				
			||||||
            .to_str()
 | 
					 | 
				
			||||||
            .map_err(|e| crate::message::Error::Other {
 | 
					 | 
				
			||||||
                source: Box::new(e),
 | 
					 | 
				
			||||||
            })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! str_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
 | 
					 | 
				
			||||||
            source: Box::new(e),
 | 
					 | 
				
			||||||
        })
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
macro_rules! attribute_name_to_header {
 | 
					 | 
				
			||||||
    ($attribute:expr) => {
 | 
					 | 
				
			||||||
        str_name_to_header!(&["ce-", $attribute].concat())
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn attributes_to_headers(
 | 
					 | 
				
			||||||
    it: impl Iterator<Item = &'static str>,
 | 
					 | 
				
			||||||
) -> HashMap<&'static str, HeaderName> {
 | 
					 | 
				
			||||||
    it.map(|s| {
 | 
					 | 
				
			||||||
        if s == "datacontenttype" {
 | 
					 | 
				
			||||||
            (s, http::header::CONTENT_TYPE)
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            (s, attribute_name_to_header!(s).unwrap())
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    })
 | 
					 | 
				
			||||||
    .collect()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
lazy_static! {
 | 
					 | 
				
			||||||
    pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
 | 
					 | 
				
			||||||
        attributes_to_headers(SpecVersion::all_attribute_names());
 | 
					 | 
				
			||||||
    pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
 | 
					 | 
				
			||||||
        HeaderName::from_static("ce-specversion");
 | 
					 | 
				
			||||||
    pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
 | 
					 | 
				
			||||||
        HeaderValue::from_static("application/cloudevents+json");
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
| 
						 | 
					@ -61,9 +61,6 @@
 | 
				
			||||||
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
 | 
					//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
 | 
				
			||||||
//! ```
 | 
					//! ```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[macro_use]
 | 
					 | 
				
			||||||
mod headers;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
mod server_request;
 | 
					mod server_request;
 | 
				
			||||||
mod server_response;
 | 
					mod server_response;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,13 +1,13 @@
 | 
				
			||||||
use super::headers;
 | 
					 | 
				
			||||||
use bytes::Bytes;
 | 
					use bytes::Bytes;
 | 
				
			||||||
use http::{header::HeaderName, HeaderMap};
 | 
					use http::HeaderMap;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use crate::binding::http::SPEC_VERSION_HEADER;
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
 | 
					use crate::header_value_to_str;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
 | 
					    BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
 | 
				
			||||||
    MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
 | 
					    MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					 | 
				
			||||||
use crate::{message, Event};
 | 
					use crate::{message, Event};
 | 
				
			||||||
use std::convert::TryFrom;
 | 
					use std::convert::TryFrom;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -29,18 +29,20 @@ impl BinaryDeserializer for RequestDeserializer {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let spec_version = SpecVersion::try_from(
 | 
					        let spec_version = SpecVersion::try_from(
 | 
				
			||||||
            unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?,
 | 
					            self.headers
 | 
				
			||||||
 | 
					                .get(SPEC_VERSION_HEADER)
 | 
				
			||||||
 | 
					                .map(|a| header_value_to_str!(a))
 | 
				
			||||||
 | 
					                .unwrap()?,
 | 
				
			||||||
        )?;
 | 
					        )?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
					        visitor = visitor.set_spec_version(spec_version.clone())?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let attributes = spec_version.attribute_names();
 | 
					        let attributes = spec_version.attribute_names();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (hn, hv) in self
 | 
					        for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
 | 
				
			||||||
            .headers
 | 
					            let key = hn.as_str();
 | 
				
			||||||
            .iter()
 | 
					            SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
 | 
				
			||||||
            .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-"))
 | 
					        }) {
 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            let name = &hn.as_str()["ce-".len()..];
 | 
					            let name = &hn.as_str()["ce-".len()..];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if attributes.contains(&name) {
 | 
					            if attributes.contains(&name) {
 | 
				
			||||||
| 
						 | 
					@ -90,11 +92,7 @@ impl MessageDeserializer for RequestDeserializer {
 | 
				
			||||||
            == "application/cloudevents+json"
 | 
					            == "application/cloudevents+json"
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            Encoding::STRUCTURED
 | 
					            Encoding::STRUCTURED
 | 
				
			||||||
        } else if self
 | 
					        } else if self.headers.contains_key(SPEC_VERSION_HEADER) {
 | 
				
			||||||
            .headers
 | 
					 | 
				
			||||||
            .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
 | 
					 | 
				
			||||||
            .is_some()
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            Encoding::BINARY
 | 
					            Encoding::BINARY
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            Encoding::UNKNOWN
 | 
					            Encoding::UNKNOWN
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,23 +1,21 @@
 | 
				
			||||||
use warp_lib as warp;
 | 
					use warp_lib as warp;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::headers;
 | 
					use crate::binding::{
 | 
				
			||||||
 | 
					    http::{header_prefix, SPEC_VERSION_HEADER},
 | 
				
			||||||
 | 
					    CLOUDEVENTS_JSON_HEADER,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
use crate::event::SpecVersion;
 | 
					use crate::event::SpecVersion;
 | 
				
			||||||
use crate::message::{
 | 
					use crate::message::{
 | 
				
			||||||
    BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
 | 
					    BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
 | 
				
			||||||
    StructuredSerializer,
 | 
					    StructuredSerializer,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use crate::Event;
 | 
					use crate::{str_to_header_value, Event};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use warp::http::HeaderValue;
 | 
					 | 
				
			||||||
use warp::hyper::Body;
 | 
					use warp::hyper::Body;
 | 
				
			||||||
use warp::reply::Response;
 | 
					use warp::reply::Response;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use http::header::HeaderName;
 | 
					 | 
				
			||||||
use http::response::Builder;
 | 
					use http::response::Builder;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use std::{convert::TryFrom, str::FromStr};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
pub struct ResponseSerializer {
 | 
					pub struct ResponseSerializer {
 | 
				
			||||||
    builder: Builder,
 | 
					    builder: Builder,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -32,38 +30,23 @@ impl ResponseSerializer {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl BinarySerializer<Response> for ResponseSerializer {
 | 
					impl BinarySerializer<Response> for ResponseSerializer {
 | 
				
			||||||
    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
 | 
					    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
 | 
				
			||||||
        self.builder = self.builder.header(
 | 
					        self.builder = self
 | 
				
			||||||
            headers::SPEC_VERSION_HEADER.clone(),
 | 
					            .builder
 | 
				
			||||||
            HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| {
 | 
					            .header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
 | 
				
			||||||
                crate::message::Error::Other {
 | 
					 | 
				
			||||||
                    source: Box::new(e),
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            })?,
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.builder = self.builder.header(
 | 
					        self.builder = self
 | 
				
			||||||
            headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
 | 
					            .builder
 | 
				
			||||||
            HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
 | 
					            .header(&header_prefix(name), str_to_header_value!(value)?);
 | 
				
			||||||
                crate::message::Error::Other {
 | 
					 | 
				
			||||||
                    source: Box::new(e),
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            })?,
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
					    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
 | 
				
			||||||
        self.builder = self.builder.header(
 | 
					        self.builder = self
 | 
				
			||||||
            attribute_name_to_header!(name)?,
 | 
					            .builder
 | 
				
			||||||
            HeaderValue::try_from(value.to_string().as_str()).map_err(|e| {
 | 
					            .header(&header_prefix(name), str_to_header_value!(value)?);
 | 
				
			||||||
                crate::message::Error::Other {
 | 
					 | 
				
			||||||
                    source: Box::new(e),
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            })?,
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(self)
 | 
					        Ok(self)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -87,10 +70,7 @@ impl BinarySerializer<Response> for ResponseSerializer {
 | 
				
			||||||
impl StructuredSerializer<Response> for ResponseSerializer {
 | 
					impl StructuredSerializer<Response> for ResponseSerializer {
 | 
				
			||||||
    fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
 | 
					    fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
 | 
				
			||||||
        self.builder
 | 
					        self.builder
 | 
				
			||||||
            .header(
 | 
					            .header(http::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
 | 
				
			||||||
                http::header::CONTENT_TYPE,
 | 
					 | 
				
			||||||
                headers::CLOUDEVENTS_JSON_HEADER.clone(),
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            .body(Body::from(bytes))
 | 
					            .body(Body::from(bytes))
 | 
				
			||||||
            .map_err(|e| crate::message::Error::Other {
 | 
					            .map_err(|e| crate::message::Error::Other {
 | 
				
			||||||
                source: Box::new(e),
 | 
					                source: Box::new(e),
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -32,13 +32,6 @@ impl SpecVersion {
 | 
				
			||||||
            SpecVersion::V10 => &v10::ATTRIBUTE_NAMES,
 | 
					            SpecVersion::V10 => &v10::ATTRIBUTE_NAMES,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    /// Get all attribute names for all specification versions.
 | 
					 | 
				
			||||||
    /// Note that the result iterator could contain duplicate entries.
 | 
					 | 
				
			||||||
    pub fn all_attribute_names() -> impl Iterator<Item = &'static str> {
 | 
					 | 
				
			||||||
        vec![SpecVersion::V03, SpecVersion::V10]
 | 
					 | 
				
			||||||
            .into_iter()
 | 
					 | 
				
			||||||
            .flat_map(|s| s.attribute_names().to_owned().into_iter())
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl fmt::Display for SpecVersion {
 | 
					impl fmt::Display for SpecVersion {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue