Merge pull request #23 from cloudevents/refactor_serde

Refactored serialization/deserialization code
This commit is contained in:
Francesco Guardiani 2020-04-06 16:23:22 +02:00 committed by GitHub
commit ad7ec80cde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 333 additions and 156 deletions

20
Cargo.lock generated
View File

@ -50,6 +50,7 @@ dependencies = [
"hostname",
"rstest",
"serde",
"serde-value",
"serde_json",
"uuid",
]
@ -117,6 +118,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "ordered-float"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18869315e81473c951eb56ad5558bbc56978562d3ecfb87abb7a1e944cea4518"
dependencies = [
"num-traits",
]
[[package]]
name = "ppv-lite86"
version = "0.2.6"
@ -240,6 +250,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-value"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a65a7291a8a568adcae4c10a677ebcedbc6c9cec91c054dee2ce40b0e3290eb"
dependencies = [
"ordered-float",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.104"

View File

@ -13,6 +13,7 @@ repository = "https://github.com/cloudevents/sdk-rust"
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
serde-value = "^0.6"
chrono = { version = "^0.4", features = ["serde"] }
delegate = "^0.4"
uuid = { version = "^0.8", features = ["serde", "v4"] }

View File

@ -2,10 +2,19 @@
Work in progress SDK for [CloudEvents](https://github.com/cloudevents/spec)
## Status
## Spec support
This SDK current supports the following versions of CloudEvents:
- TBD
| | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| :---------------------------: | :----------------------------------------------------------------------------: | :---------------------------------------------------------------------------------: |
| CloudEvents Core | :x: | :heavy_check_mark: |
| AMQP Protocol Binding | :x: | :x: |
| AVRO Event Format | :x: | :x: |
| HTTP Protocol Binding | :x: | :x: |
| JSON Event Format | :x: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :x: |
| MQTT Protocol Binding | :x: | :x: |
| NATS Protocol Binding | :x: | :x: |
| Web hook | :x: | :x: |
## Development & Contributing

View File

@ -1,7 +1,6 @@
use super::SpecVersion;
use crate::event::{AttributesV10, ExtensionValue};
use crate::event::AttributesV10;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
/// Trait to get [CloudEvents Context attributes](https://github.com/cloudevents/spec/blob/master/spec.md#context-attributes).
pub trait AttributesReader {
@ -21,10 +20,6 @@ pub trait AttributesReader {
fn get_subject(&self) -> Option<&str>;
/// Get the [time](https://github.com/cloudevents/spec/blob/master/spec.md#time).
fn get_time(&self) -> Option<&DateTime<Utc>>;
/// Get the [extension](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes) named `extension_name`
fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue>;
/// Get all the [extensions](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes)
fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)>;
}
pub trait AttributesWriter {
@ -33,15 +28,6 @@ pub trait AttributesWriter {
fn set_type(&mut self, ty: impl Into<String>);
fn set_subject(&mut self, subject: Option<impl Into<String>>);
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>);
fn set_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
extension_value: impl Into<ExtensionValue>,
);
fn remove_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
) -> Option<ExtensionValue>;
}
pub(crate) trait DataAttributesWriter {
@ -49,10 +35,8 @@ pub(crate) trait DataAttributesWriter {
fn set_dataschema(&mut self, dataschema: Option<impl Into<String>>);
}
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "specversion")]
#[derive(PartialEq, Debug, Clone)]
pub enum Attributes {
#[serde(rename = "1.0")]
V10(AttributesV10),
}
@ -104,18 +88,6 @@ impl AttributesReader for Attributes {
Attributes::V10(a) => a.get_time(),
}
}
fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue> {
match self {
Attributes::V10(a) => a.get_extension(extension_name),
}
}
fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)> {
match self {
Attributes::V10(a) => a.get_extensions(),
}
}
}
impl AttributesWriter for Attributes {
@ -148,25 +120,6 @@ impl AttributesWriter for Attributes {
Attributes::V10(a) => a.set_time(time),
}
}
fn set_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
extension_value: impl Into<ExtensionValue>,
) {
match self {
Attributes::V10(a) => a.set_extension(extension_name, extension_value),
}
}
fn remove_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
) -> Option<ExtensionValue> {
match self {
Attributes::V10(a) => a.remove_extension(extension_name),
}
}
}
impl DataAttributesWriter for Attributes {

View File

@ -1,17 +1,10 @@
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::convert::{Into, TryFrom};
use std::fmt::{self, Formatter};
/// Event [data attribute](https://github.com/cloudevents/spec/blob/master/spec.md#event-data) representation
///
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Clone)]
pub enum Data {
#[serde(rename = "data_base64")]
#[serde(serialize_with = "serialize_base64")]
#[serde(deserialize_with = "deserialize_base64")]
Binary(Vec<u8>),
#[serde(rename = "data")]
Json(serde_json::Value),
}
@ -37,37 +30,6 @@ impl Data {
}
}
fn serialize_base64<S>(data: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&base64::encode(&data))
}
struct Base64Visitor;
impl<'de> Visitor<'de> for Base64Visitor {
type Value = Vec<u8>;
fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
formatter.write_str("a Base64 encoded string")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
base64::decode(v).map_err(|e| serde::de::Error::custom(e.to_string()))
}
}
fn deserialize_base64<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(Base64Visitor)
}
impl Into<Data> for serde_json::Value {
fn into(self) -> Data {
Data::Json(self)

View File

@ -5,7 +5,7 @@ use super::{
use crate::event::attributes::DataAttributesWriter;
use chrono::{DateTime, Utc};
use delegate::delegate;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::TryFrom;
/// Data structure that represents a [CloudEvent](https://github.com/cloudevents/spec/blob/master/spec.md).
@ -32,15 +32,11 @@ use std::convert::TryFrom;
/// let data: serde_json::Value = e.try_get_data().unwrap().unwrap();
/// println!("Event data: {}", data)
/// ```
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Debug, Clone)]
pub struct Event {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(alias = "data_base64")]
#[serde(alias = "data")]
#[serde(flatten)]
pub data: Option<Data>,
#[serde(flatten)]
pub attributes: Attributes,
pub data: Option<Data>,
pub extensions: HashMap<String, ExtensionValue>,
}
impl AttributesReader for Event {
@ -54,8 +50,6 @@ impl AttributesReader for Event {
fn get_dataschema(&self) -> Option<&str>;
fn get_subject(&self) -> Option<&str>;
fn get_time(&self) -> Option<&DateTime<Utc>>;
fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue>;
fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)>;
}
}
}
@ -68,15 +62,6 @@ impl AttributesWriter for Event {
fn set_type(&mut self, ty: impl Into<String>);
fn set_subject(&mut self, subject: Option<impl Into<String>>);
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>);
fn set_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
extension_value: impl Into<ExtensionValue>,
);
fn remove_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
) -> Option<ExtensionValue>;
}
}
}
@ -86,6 +71,7 @@ impl Default for Event {
Event {
attributes: Attributes::V10(AttributesV10::default()),
data: None,
extensions: HashMap::default(),
}
}
}
@ -156,6 +142,35 @@ impl Event {
}
.transpose()
}
/// Get the [extension](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes) named `extension_name`
pub fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue> {
self.extensions.get(extension_name)
}
/// Get all the [extensions](https://github.com/cloudevents/spec/blob/master/spec.md#extension-context-attributes)
pub fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)> {
self.extensions
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect()
}
pub fn set_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
extension_value: impl Into<ExtensionValue>,
) {
self.extensions
.insert(extension_name.to_owned(), extension_value.into());
}
pub fn remove_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
) -> Option<ExtensionValue> {
self.extensions.remove(extension_name)
}
}
#[cfg(test)]

View File

@ -3,6 +3,8 @@ mod builder;
mod data;
mod event;
mod extensions;
#[macro_use]
mod serde;
mod spec_version;
pub use attributes::Attributes;
@ -17,3 +19,5 @@ mod v10;
pub use v10::Attributes as AttributesV10;
pub use v10::EventBuilder as EventBuilderV10;
pub(crate) use v10::EventDeserializer as EventDeserializerV10;
pub(crate) use v10::EventSerializer as EventSerializerV10;

133
src/event/serde.rs Normal file
View File

@ -0,0 +1,133 @@
use super::{Attributes, Data, Event, EventDeserializerV10, EventSerializerV10};
use crate::event::ExtensionValue;
use serde::de::{Error, IntoDeserializer, Unexpected};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_value::Value;
use std::collections::{BTreeMap, HashMap};
const SPEC_VERSIONS: [&'static str; 1] = ["1.0"];
macro_rules! parse_optional_field {
($map:ident, $name:literal, $value_variant:ident, $error:ty) => {
$map.remove($name)
.map(|val| match val {
Value::$value_variant(v) => Ok(v),
other => Err(<$error>::invalid_type(
crate::event::serde::value_to_unexpected(&other),
&stringify!($value_variant),
)),
})
.transpose()
};
}
macro_rules! parse_field {
($map:ident, $name:literal, $value_variant:ident, $error:ty) => {
parse_optional_field!($map, $name, $value_variant, $error)?
.ok_or_else(|| <$error>::missing_field($name))
};
}
pub(crate) trait EventDeserializer {
fn deserialize_attributes<E: serde::de::Error>(
&self,
map: &mut BTreeMap<String, Value>,
) -> Result<Attributes, E>;
fn deserialize_data<E: serde::de::Error>(
&self,
map: &mut BTreeMap<String, Value>,
) -> Result<Option<Data>, E>;
}
pub(crate) trait EventSerializer<S: Serializer, A: Sized> {
fn serialize(
attributes: &A,
data: &Option<Data>,
extensions: &HashMap<String, ExtensionValue>,
serializer: S,
) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>;
}
impl<'de> Deserialize<'de> for Event {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
where
D: Deserializer<'de>,
{
let map = match Value::deserialize(deserializer)? {
Value::Map(m) => Ok(m),
v => Err(Error::invalid_type(value_to_unexpected(&v), &"a map")),
}?;
let mut map: BTreeMap<String, Value> = map
.into_iter()
.map(|(k, v)| match k {
Value::String(s) => Ok((s, v)),
k => Err(Error::invalid_type(value_to_unexpected(&k), &"a string")),
})
.collect::<Result<BTreeMap<String, Value>, <D as Deserializer<'de>>::Error>>()?;
let event_deserializer =
match parse_field!(map, "specversion", String, <D as Deserializer<'de>>::Error)?
.as_str()
{
"1.0" => Ok(EventDeserializerV10 {}),
s => Err(<D as Deserializer<'de>>::Error::unknown_variant(
s,
&SPEC_VERSIONS,
)),
}?;
let attributes = event_deserializer.deserialize_attributes(&mut map)?;
let data = event_deserializer.deserialize_data(&mut map)?;
let extensions = map
.into_iter()
.map(|(k, v)| Ok((k, ExtensionValue::deserialize(v.into_deserializer())?)))
.collect::<Result<HashMap<String, ExtensionValue>, serde_value::DeserializerError>>()
.map_err(|e| <D as Deserializer<'de>>::Error::custom(e))?;
Ok(Event {
attributes,
data,
extensions,
})
}
}
impl Serialize for Event {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
match &self.attributes {
Attributes::V10(a) => {
EventSerializerV10::serialize(a, &self.data, &self.extensions, serializer)
}
}
}
}
// This should be provided by the Value package itself
pub(crate) fn value_to_unexpected(v: &Value) -> Unexpected {
match v {
Value::Bool(b) => serde::de::Unexpected::Bool(*b),
Value::U8(n) => serde::de::Unexpected::Unsigned(*n as u64),
Value::U16(n) => serde::de::Unexpected::Unsigned(*n as u64),
Value::U32(n) => serde::de::Unexpected::Unsigned(*n as u64),
Value::U64(n) => serde::de::Unexpected::Unsigned(*n),
Value::I8(n) => serde::de::Unexpected::Signed(*n as i64),
Value::I16(n) => serde::de::Unexpected::Signed(*n as i64),
Value::I32(n) => serde::de::Unexpected::Signed(*n as i64),
Value::I64(n) => serde::de::Unexpected::Signed(*n),
Value::F32(n) => serde::de::Unexpected::Float(*n as f64),
Value::F64(n) => serde::de::Unexpected::Float(*n),
Value::Char(c) => serde::de::Unexpected::Char(*c),
Value::String(s) => serde::de::Unexpected::Str(s),
Value::Unit => serde::de::Unexpected::Unit,
Value::Option(_) => serde::de::Unexpected::Option,
Value::Newtype(_) => serde::de::Unexpected::NewtypeStruct,
Value::Seq(_) => serde::de::Unexpected::Seq,
Value::Map(_) => serde::de::Unexpected::Map,
Value::Bytes(b) => serde::de::Unexpected::Bytes(b),
}
}

View File

@ -1,27 +1,18 @@
use crate::event::attributes::DataAttributesWriter;
use crate::event::{AttributesReader, AttributesWriter, ExtensionValue, SpecVersion};
use crate::event::{AttributesReader, AttributesWriter, SpecVersion};
use chrono::{DateTime, Utc};
use hostname::get_hostname;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Debug, Clone)]
pub struct Attributes {
id: String,
#[serde(rename = "type")]
ty: String,
source: String,
#[serde(skip_serializing_if = "Option::is_none")]
datacontenttype: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
dataschema: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
subject: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
time: Option<DateTime<Utc>>,
#[serde(flatten)]
extensions: HashMap<String, ExtensionValue>,
pub(crate) id: String,
pub(crate) ty: String,
pub(crate) source: String,
pub(crate) datacontenttype: Option<String>,
pub(crate) dataschema: Option<String>,
pub(crate) subject: Option<String>,
pub(crate) time: Option<DateTime<Utc>>,
}
impl AttributesReader for Attributes {
@ -65,17 +56,6 @@ impl AttributesReader for Attributes {
fn get_time(&self) -> Option<&DateTime<Utc>> {
self.time.as_ref()
}
fn get_extension(&self, extension_name: &str) -> Option<&ExtensionValue> {
self.extensions.get(extension_name)
}
fn get_extensions(&self) -> Vec<(&str, &ExtensionValue)> {
self.extensions
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect()
}
}
impl AttributesWriter for Attributes {
@ -98,22 +78,6 @@ impl AttributesWriter for Attributes {
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>) {
self.time = time.map(Into::into)
}
fn set_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
extension_value: impl Into<ExtensionValue>,
) {
self.extensions
.insert(extension_name.to_owned(), extension_value.into());
}
fn remove_extension<'name, 'event: 'name>(
&'event mut self,
extension_name: &'name str,
) -> Option<ExtensionValue> {
self.extensions.remove(extension_name)
}
}
impl DataAttributesWriter for Attributes {
@ -136,7 +100,6 @@ impl Default for Attributes {
dataschema: None,
subject: None,
time: None,
extensions: HashMap::new(),
}
}
}

View File

@ -1,6 +1,7 @@
use super::Attributes as AttributesV10;
use crate::event::{Attributes, AttributesWriter, Data, Event, ExtensionValue};
use chrono::{DateTime, Utc};
use std::collections::HashMap;
pub struct EventBuilder {
event: Event,
@ -17,6 +18,7 @@ impl EventBuilder {
event: Event {
attributes: Attributes::V10(AttributesV10::default()),
data: None,
extensions: HashMap::new(),
},
}
}

View File

@ -1,5 +1,8 @@
mod attributes;
mod builder;
mod serde;
pub(crate) use crate::event::v10::serde::EventDeserializer;
pub(crate) use crate::event::v10::serde::EventSerializer;
pub use attributes::Attributes;
pub use builder::EventBuilder;

111
src/event/v10/serde.rs Normal file
View File

@ -0,0 +1,111 @@
use super::Attributes;
use crate::event::{Data, ExtensionValue};
use chrono::{DateTime, Utc};
use serde::de::{IntoDeserializer, Unexpected};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serializer};
use serde_value::Value;
use std::collections::{BTreeMap, HashMap};
pub(crate) struct EventDeserializer {}
impl crate::event::serde::EventDeserializer for EventDeserializer {
fn deserialize_attributes<E: serde::de::Error>(
&self,
map: &mut BTreeMap<String, Value>,
) -> Result<crate::event::Attributes, E> {
Ok(crate::event::Attributes::V10(Attributes {
id: parse_field!(map, "id", String, E)?,
ty: parse_field!(map, "type", String, E)?,
source: parse_field!(map, "source", String, E)?,
datacontenttype: parse_optional_field!(map, "datacontenttype", String, E)?,
dataschema: parse_optional_field!(map, "dataschema", String, E)?,
subject: parse_optional_field!(map, "subject", String, E)?,
time: parse_optional_field!(map, "time", String, E)?
.map(|s| match DateTime::parse_from_rfc3339(&s) {
Ok(d) => Ok(DateTime::<Utc>::from(d)),
Err(e) => Err(E::invalid_value(
Unexpected::Str(&s),
&e.to_string().as_str(),
)),
})
.transpose()?,
}))
}
fn deserialize_data<E: serde::de::Error>(
&self,
map: &mut BTreeMap<String, Value>,
) -> Result<Option<Data>, E> {
let data = map.remove("data");
let data_base64 = map.remove("data_base64");
match (data, data_base64) {
(Some(d), None) => Ok(Some(Data::Json(
serde_json::Value::deserialize(d.into_deserializer()).map_err(|e| E::custom(e))?,
))),
(None, Some(d)) => match d {
Value::String(s) => Ok(Some(Data::from_base64(s.clone()).map_err(|e| {
E::invalid_value(Unexpected::Str(&s), &e.to_string().as_str())
})?)),
other => Err(E::invalid_type(
crate::event::serde::value_to_unexpected(&other),
&"a string",
)),
},
(Some(_), Some(_)) => Err(E::custom("Cannot have both data and data_base64 field")),
(None, None) => Ok(None),
}
}
}
pub(crate) struct EventSerializer {}
impl<S: serde::Serializer> crate::event::serde::EventSerializer<S, Attributes> for EventSerializer {
fn serialize(
attributes: &Attributes,
data: &Option<Data>,
extensions: &HashMap<String, ExtensionValue>,
serializer: S,
) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error> {
let num =
3 + if attributes.datacontenttype.is_some() {
1
} else {
0
} + if attributes.dataschema.is_some() {
1
} else {
0
} + if attributes.subject.is_some() { 1 } else { 0 }
+ if attributes.time.is_some() { 1 } else { 0 }
+ if data.is_some() { 1 } else { 0 }
+ extensions.len();
let mut state = serializer.serialize_map(Some(num))?;
state.serialize_entry("specversion", "1.0")?;
state.serialize_entry("id", &attributes.id)?;
state.serialize_entry("type", &attributes.ty)?;
state.serialize_entry("source", &attributes.source)?;
if let Some(datacontenttype) = &attributes.datacontenttype {
state.serialize_entry("datacontenttype", datacontenttype)?;
}
if let Some(dataschema) = &attributes.dataschema {
state.serialize_entry("dataschema", dataschema)?;
}
if let Some(subject) = &attributes.subject {
state.serialize_entry("subject", subject)?;
}
if let Some(time) = &attributes.time {
state.serialize_entry("time", time)?;
}
match data {
Some(Data::Json(j)) => state.serialize_entry("data", j)?,
Some(Data::Binary(v)) => state.serialize_entry("data_base64", &base64::encode(v))?,
_ => (),
};
for (k, v) in extensions {
state.serialize_entry(k, v)?;
}
state.end()
}
}

View File

@ -1,5 +1,6 @@
extern crate serde;
extern crate serde_json;
extern crate serde_value;
pub mod event;