diff --git a/Cargo.lock b/Cargo.lock index 40f12fb..f40b1b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,8 @@ dependencies = [ "hostname", "rstest", "serde", + "serde-transcode", + "serde-value", "serde_json", "uuid", ] @@ -117,6 +119,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "ordered-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18869315e81473c951eb56ad5558bbc56978562d3ecfb87abb7a1e944cea4518" +dependencies = [ + "num-traits", +] + [[package]] name = "ppv-lite86" version = "0.2.6" @@ -240,6 +251,25 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-transcode" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97528f0dfcf8ce2d51d995cb513a103b9cd301dc3f387a9cae5ef974381d4e1c" +dependencies = [ + "serde", +] + +[[package]] +name = "serde-value" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a65a7291a8a568adcae4c10a677ebcedbc6c9cec91c054dee2ce40b0e3290eb" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.104" diff --git a/Cargo.toml b/Cargo.toml index f7d244a..b4c5a5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,8 @@ repository = "https://github.com/cloudevents/sdk-rust" [dependencies] serde = { version = "^1.0", features = ["derive"] } serde_json = "^1.0" +serde-value = "^0.6" +serde-transcode = "^1.1" chrono = { version = "^0.4", features = ["serde"] } delegate = "^0.4" uuid = { version = "^0.8", features = ["serde", "v4"] } diff --git a/README.md b/README.md index 9abde53..e3fca18 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,19 @@ Work in progress SDK for [CloudEvents](https://github.com/cloudevents/spec) -## Status +## Spec support -This SDK current supports the following versions of CloudEvents: -- TBD +| | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) | +| :---------------------------: | :----------------------------------------------------------------------------: | :---------------------------------------------------------------------------------: | +| CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: | +| AMQP Protocol Binding | :x: | :x: | +| AVRO Event Format | :x: | :x: | +| HTTP Protocol Binding | :x: | :x: | +| JSON Event Format | :heavy_check_mark: | :heavy_check_mark: | +| Kafka Protocol Binding | :x: | :x: | +| MQTT Protocol Binding | :x: | :x: | +| NATS Protocol Binding | :x: | :x: | +| Web hook | :x: | :x: | ## Development & Contributing diff --git a/src/event/attributes.rs b/src/event/attributes.rs index df6916c..1c2a8d5 100644 --- a/src/event/attributes.rs +++ b/src/event/attributes.rs @@ -1,7 +1,7 @@ use super::SpecVersion; -use crate::event::{AttributesV10, ExtensionValue}; +use crate::event::AttributesV10; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; /// Trait to get [CloudEvents Context attributes](https://github.com/cloudevents/spec/blob/master/spec.md#context-attributes). pub trait AttributesReader { @@ -21,10 +21,6 @@ pub trait AttributesReader { fn get_subject(&self) -> Option<&str>; /// Get the [time](https://github.com/cloudevents/spec/blob/master/spec.md#time). fn get_time(&self) -> Option<&DateTime>; - /// Get the [extension](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes) named `extension_name` - fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue>; - /// Get all the [extensions](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes) - fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)>; } pub trait AttributesWriter { @@ -33,15 +29,6 @@ pub trait AttributesWriter { fn set_type(&mut self, ty: impl Into); fn set_subject(&mut self, subject: Option>); fn set_time(&mut self, time: Option>>); - fn set_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - extension_value: impl Into, - ); - fn remove_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - ) -> Option; } pub(crate) trait DataAttributesWriter { @@ -49,7 +36,7 @@ pub(crate) trait DataAttributesWriter { fn set_dataschema(&mut self, dataschema: Option>); } -#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +#[derive(PartialEq, Debug, Clone, Serialize)] #[serde(tag = "specversion")] pub enum Attributes { #[serde(rename = "1.0")] @@ -104,18 +91,6 @@ impl AttributesReader for Attributes { Attributes::V10(a) => a.get_time(), } } - - fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue> { - match self { - Attributes::V10(a) => a.get_extension(extension_name), - } - } - - fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)> { - match self { - Attributes::V10(a) => a.get_extensions(), - } - } } impl AttributesWriter for Attributes { @@ -148,25 +123,6 @@ impl AttributesWriter for Attributes { Attributes::V10(a) => a.set_time(time), } } - - fn set_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - extension_value: impl Into, - ) { - match self { - Attributes::V10(a) => a.set_extension(extension_name, extension_value), - } - } - - fn remove_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - ) -> Option { - match self { - Attributes::V10(a) => a.remove_extension(extension_name), - } - } } impl DataAttributesWriter for Attributes { diff --git a/src/event/data.rs b/src/event/data.rs index 96aed58..72649be 100644 --- a/src/event/data.rs +++ b/src/event/data.rs @@ -1,15 +1,14 @@ use serde::de::Visitor; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Serialize, Serializer}; use std::convert::{Into, TryFrom}; use std::fmt::{self, Formatter}; /// Event [data attribute](https://github.com/cloudevents/spec/blob/master/spec.md#event-data) representation /// -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Clone, Serialize)] pub enum Data { #[serde(rename = "data_base64")] #[serde(serialize_with = "serialize_base64")] - #[serde(deserialize_with = "deserialize_base64")] Binary(Vec), #[serde(rename = "data")] Json(serde_json::Value), @@ -44,30 +43,6 @@ where serializer.serialize_str(&base64::encode(&data)) } -struct Base64Visitor; - -impl<'de> Visitor<'de> for Base64Visitor { - type Value = Vec; - - fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { - formatter.write_str("a Base64 encoded string") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - base64::decode(v).map_err(|e| serde::de::Error::custom(e.to_string())) - } -} - -fn deserialize_base64<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - deserializer.deserialize_str(Base64Visitor) -} - impl Into for serde_json::Value { fn into(self) -> Data { Data::Json(self) diff --git a/src/event/event.rs b/src/event/event.rs index 4d6a063..6061788 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -5,7 +5,8 @@ use super::{ use crate::event::attributes::DataAttributesWriter; use chrono::{DateTime, Utc}; use delegate::delegate; -use serde::{Deserialize, Serialize}; +use serde::Serialize; +use std::collections::HashMap; use std::convert::TryFrom; /// Data structure that represents a [CloudEvent](https://github.com/cloudevents/spec/blob/master/spec.md). @@ -32,15 +33,15 @@ use std::convert::TryFrom; /// let data: serde_json::Value = e.try_get_data().unwrap().unwrap(); /// println!("Event data: {}", data) /// ``` -#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +#[derive(PartialEq, Debug, Clone, Serialize)] pub struct Event { + #[serde(flatten)] + pub attributes: Attributes, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(alias = "data_base64")] - #[serde(alias = "data")] #[serde(flatten)] pub data: Option, #[serde(flatten)] - pub attributes: Attributes, + pub extensions: HashMap, } impl AttributesReader for Event { @@ -54,8 +55,6 @@ impl AttributesReader for Event { fn get_dataschema(&self) -> Option<&str>; fn get_subject(&self) -> Option<&str>; fn get_time(&self) -> Option<&DateTime>; - fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue>; - fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)>; } } } @@ -68,15 +67,6 @@ impl AttributesWriter for Event { fn set_type(&mut self, ty: impl Into); fn set_subject(&mut self, subject: Option>); fn set_time(&mut self, time: Option>>); - fn set_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - extension_value: impl Into, - ); - fn remove_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - ) -> Option; } } } @@ -86,6 +76,7 @@ impl Default for Event { Event { attributes: Attributes::V10(AttributesV10::default()), data: None, + extensions: HashMap::default(), } } } @@ -156,6 +147,35 @@ impl Event { } .transpose() } + + /// Get the [extension](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes) named `extension_name` + pub fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue> { + self.extensions.get(extension_name) + } + + /// Get all the [extensions](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes) + pub fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)> { + self.extensions + .iter() + .map(|(k, v)| (k.as_str(), v)) + .collect() + } + + pub fn set_extension<'name, 'event: 'name>( + &'event mut self, + extension_name: &'name str, + extension_value: impl Into, + ) { + self.extensions + .insert(extension_name.to_owned(), extension_value.into()); + } + + pub fn remove_extension<'name, 'event: 'name>( + &'event mut self, + extension_name: &'name str, + ) -> Option { + self.extensions.remove(extension_name) + } } #[cfg(test)] diff --git a/src/event/mod.rs b/src/event/mod.rs index 888b50c..6dd090c 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -3,6 +3,8 @@ mod builder; mod data; mod event; mod extensions; +#[macro_use] +mod serde; mod spec_version; pub use attributes::Attributes; @@ -17,3 +19,4 @@ mod v10; pub use v10::Attributes as AttributesV10; pub use v10::EventBuilder as EventBuilderV10; +pub(crate) use v10::EventDeserializer as EventDeseriazerV10; diff --git a/src/event/serde.rs b/src/event/serde.rs new file mode 100644 index 0000000..7b5d5ef --- /dev/null +++ b/src/event/serde.rs @@ -0,0 +1,110 @@ +use super::{Attributes, Data, Event, EventDeseriazerV10}; +use serde::de::{Error, Unexpected, IntoDeserializer}; +use serde::{Deserialize, Deserializer}; +use serde_value::Value; +use std::collections::{BTreeMap, HashMap}; +use crate::event::ExtensionValue; + +const SPEC_VERSIONS: [&'static str; 1] = ["1.0"]; + +macro_rules! parse_optional_field { + ($map:ident, $name:literal, $value_variant:ident, $error:ty) => { + $map.remove($name) + .map(|val| match val { + Value::$value_variant(v) => Ok(v), + other => Err(<$error>::invalid_type( + crate::event::serde::value_to_unexpected(&other), + &stringify!($value_variant), + )), + }) + .transpose() + }; +} + +macro_rules! parse_field { + ($map:ident, $name:literal, $value_variant:ident, $error:ty) => { + parse_optional_field!($map, $name, $value_variant, $error)? + .ok_or_else(|| <$error>::missing_field($name)) + }; +} + +pub(crate) trait EventDeserializer { + fn deserialize_attributes( + &self, + map: &mut BTreeMap, + ) -> Result; + + fn deserialize_data( + &self, + map: &mut BTreeMap, + ) -> Result, E>; +} + +impl<'de> Deserialize<'de> for Event { + fn deserialize(deserializer: D) -> Result>::Error> + where + D: Deserializer<'de>, + { + let map = match Value::deserialize(deserializer)? { + Value::Map(m) => Ok(m), + v => Err(Error::invalid_type(value_to_unexpected(&v), &"a map")), + }?; + + let mut map: BTreeMap = map + .into_iter() + .map(|(k, v)| match k { + Value::String(s) => Ok((s, v)), + k => Err(Error::invalid_type(value_to_unexpected(&k), &"a string")), + }) + .collect::, >::Error>>()?; + + let event_deserializer = + match parse_field!(map, "specversion", String, >::Error)? + .as_str() + { + "1.0" => Ok(EventDeseriazerV10 {}), + s => Err(>::Error::unknown_variant( + s, + &SPEC_VERSIONS, + )), + }?; + + let attributes = event_deserializer.deserialize_attributes(&mut map)?; + let data = event_deserializer.deserialize_data(&mut map)?; + let extensions = map.into_iter() + .map(|(k, v)| Ok((k, ExtensionValue::deserialize(v.into_deserializer())?))) + .collect::, serde_value::DeserializerError>>() + .map_err(|e| >::Error::custom(e))?; + + Ok(Event { + attributes, + data, + extensions, + }) + } +} + +// This should be provided by the Value package itself +pub(crate) fn value_to_unexpected(v: &Value) -> Unexpected { + match v { + Value::Bool(b) => serde::de::Unexpected::Bool(*b), + Value::U8(n) => serde::de::Unexpected::Unsigned(*n as u64), + Value::U16(n) => serde::de::Unexpected::Unsigned(*n as u64), + Value::U32(n) => serde::de::Unexpected::Unsigned(*n as u64), + Value::U64(n) => serde::de::Unexpected::Unsigned(*n), + Value::I8(n) => serde::de::Unexpected::Signed(*n as i64), + Value::I16(n) => serde::de::Unexpected::Signed(*n as i64), + Value::I32(n) => serde::de::Unexpected::Signed(*n as i64), + Value::I64(n) => serde::de::Unexpected::Signed(*n), + Value::F32(n) => serde::de::Unexpected::Float(*n as f64), + Value::F64(n) => serde::de::Unexpected::Float(*n), + Value::Char(c) => serde::de::Unexpected::Char(*c), + Value::String(s) => serde::de::Unexpected::Str(s), + Value::Unit => serde::de::Unexpected::Unit, + Value::Option(_) => serde::de::Unexpected::Option, + Value::Newtype(_) => serde::de::Unexpected::NewtypeStruct, + Value::Seq(_) => serde::de::Unexpected::Seq, + Value::Map(_) => serde::de::Unexpected::Map, + Value::Bytes(b) => serde::de::Unexpected::Bytes(b), + } +} diff --git a/src/event/v10/attributes.rs b/src/event/v10/attributes.rs index 6aa9d0d..292824c 100644 --- a/src/event/v10/attributes.rs +++ b/src/event/v10/attributes.rs @@ -1,27 +1,24 @@ use crate::event::attributes::DataAttributesWriter; -use crate::event::{AttributesReader, AttributesWriter, ExtensionValue, SpecVersion}; +use crate::event::{AttributesReader, AttributesWriter, SpecVersion}; use chrono::{DateTime, Utc}; use hostname::get_hostname; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use serde::Serialize; use uuid::Uuid; -#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +#[derive(PartialEq, Debug, Clone, Serialize)] pub struct Attributes { - id: String, + pub(crate) id: String, #[serde(rename = "type")] - ty: String, - source: String, + pub(crate) ty: String, + pub(crate) source: String, #[serde(skip_serializing_if = "Option::is_none")] - datacontenttype: Option, + pub(crate) datacontenttype: Option, #[serde(skip_serializing_if = "Option::is_none")] - dataschema: Option, + pub(crate) dataschema: Option, #[serde(skip_serializing_if = "Option::is_none")] - subject: Option, + pub(crate) subject: Option, #[serde(skip_serializing_if = "Option::is_none")] - time: Option>, - #[serde(flatten)] - extensions: HashMap, + pub(crate) time: Option>, } impl AttributesReader for Attributes { @@ -65,17 +62,6 @@ impl AttributesReader for Attributes { fn get_time(&self) -> Option<&DateTime> { self.time.as_ref() } - - fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue> { - self.extensions.get(extension_name) - } - - fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)> { - self.extensions - .iter() - .map(|(k, v)| (k.as_str(), v)) - .collect() - } } impl AttributesWriter for Attributes { @@ -98,22 +84,6 @@ impl AttributesWriter for Attributes { fn set_time(&mut self, time: Option>>) { self.time = time.map(Into::into) } - - fn set_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - extension_value: impl Into, - ) { - self.extensions - .insert(extension_name.to_owned(), extension_value.into()); - } - - fn remove_extension<'name, 'event: 'name>( - &'event mut self, - extension_name: &'name str, - ) -> Option { - self.extensions.remove(extension_name) - } } impl DataAttributesWriter for Attributes { @@ -136,7 +106,6 @@ impl Default for Attributes { dataschema: None, subject: None, time: None, - extensions: HashMap::new(), } } } diff --git a/src/event/v10/builder.rs b/src/event/v10/builder.rs index 662bace..3b3ed94 100644 --- a/src/event/v10/builder.rs +++ b/src/event/v10/builder.rs @@ -1,6 +1,7 @@ use super::Attributes as AttributesV10; use crate::event::{Attributes, AttributesWriter, Data, Event, ExtensionValue}; use chrono::{DateTime, Utc}; +use std::collections::HashMap; pub struct EventBuilder { event: Event, @@ -17,6 +18,7 @@ impl EventBuilder { event: Event { attributes: Attributes::V10(AttributesV10::default()), data: None, + extensions: HashMap::new(), }, } } diff --git a/src/event/v10/mod.rs b/src/event/v10/mod.rs index 14b945f..3a7c015 100644 --- a/src/event/v10/mod.rs +++ b/src/event/v10/mod.rs @@ -1,5 +1,7 @@ mod attributes; mod builder; +mod serde; +pub(crate) use crate::event::v10::serde::EventDeserializer; pub use attributes::Attributes; pub use builder::EventBuilder; diff --git a/src/event/v10/serde.rs b/src/event/v10/serde.rs new file mode 100644 index 0000000..ae2e588 --- /dev/null +++ b/src/event/v10/serde.rs @@ -0,0 +1,59 @@ +use super::Attributes; +use crate::event::Data; +use chrono::{DateTime, Utc}; +use serde::de::{IntoDeserializer, Unexpected}; +use serde::Deserialize; +use serde_value::Value; +use std::collections::BTreeMap; + +pub(crate) struct EventDeserializer {} + +impl crate::event::serde::EventDeserializer for EventDeserializer { + fn deserialize_attributes( + &self, + map: &mut BTreeMap, + ) -> Result { + Ok(crate::event::Attributes::V10(Attributes { + id: parse_field!(map, "id", String, E)?, + ty: parse_field!(map, "type", String, E)?, + source: parse_field!(map, "source", String, E)?, + datacontenttype: parse_optional_field!(map, "datacontenttype", String, E)?, + dataschema: parse_optional_field!(map, "dataschema", String, E)?, + subject: parse_optional_field!(map, "subject", String, E)?, + time: parse_optional_field!(map, "time", String, E)? + .map(|s| match DateTime::parse_from_rfc3339(&s) { + Ok(d) => Ok(DateTime::::from(d)), + Err(e) => Err(E::invalid_value( + Unexpected::Str(&s), + &e.to_string().as_str(), + )), + }) + .transpose()?, + })) + } + + fn deserialize_data( + &self, + map: &mut BTreeMap, + ) -> Result, E> { + let data = map.remove("data"); + let data_base64 = map.remove("data_base64"); + + match (data, data_base64) { + (Some(d), None) => Ok(Some(Data::Json( + serde_json::Value::deserialize(d.into_deserializer()).map_err(|e| E::custom(e))?, + ))), + (None, Some(d)) => match d { + Value::String(s) => Ok(Some(Data::from_base64(s.clone()).map_err(|e| { + E::invalid_value(Unexpected::Str(&s), &e.to_string().as_str()) + })?)), + other => Err(E::invalid_type( + crate::event::serde::value_to_unexpected(&other), + &"a string", + )), + }, + (Some(_), Some(_)) => Err(E::custom("Cannot have both data and data_base64 field")), + (None, None) => Ok(None), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 8c9840b..6c04514 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ extern crate serde; extern crate serde_json; +extern crate serde_value; pub mod event;