Merge 695abf4425
into c4a5443d19
This commit is contained in:
commit
85a0f65484
|
@ -30,6 +30,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"]
|
||||||
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
|
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
|
||||||
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
|
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
|
||||||
nats = ["nats-lib"]
|
nats = ["nats-lib"]
|
||||||
|
fe2o3-amqp = ["fe2o3-amqp-types"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = { version = "^1.0", features = ["derive"] }
|
serde = { version = "^1.0", features = ["derive"] }
|
||||||
|
@ -57,6 +58,7 @@ axum-lib = { version = "^0.6", optional = true, package="axum"}
|
||||||
http-body = { version = "^0.4", optional = true }
|
http-body = { version = "^0.4", optional = true }
|
||||||
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
|
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
|
||||||
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
|
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
|
||||||
|
fe2o3-amqp-types = { version = "0.5.1", optional = true }
|
||||||
|
|
||||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
|
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
|
||||||
hostname = "^0.3"
|
hostname = "^0.3"
|
||||||
|
@ -80,3 +82,4 @@ mockito = "0.25.1"
|
||||||
tokio = { version = "^1.0", features = ["full"] }
|
tokio = { version = "^1.0", features = ["full"] }
|
||||||
mime = "0.3"
|
mime = "0.3"
|
||||||
tower = { version = "0.4", features = ["util"] }
|
tower = { version = "0.4", features = ["util"] }
|
||||||
|
fe2o3-amqp = { version = "0.6.1" }
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
[package]
|
||||||
|
name = "fe2o3-amqp-example"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
cloudevents-sdk = { path = "../..", features = ["fe2o3-amqp"] }
|
||||||
|
fe2o3-amqp = "0.5.1"
|
||||||
|
tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] }
|
||||||
|
serde_json = "1"
|
|
@ -0,0 +1,109 @@
|
||||||
|
//! AMQP 1.0 binding example
|
||||||
|
//!
|
||||||
|
//! You need a running AMQP 1.0 broker to try out this example.
|
||||||
|
//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
|
||||||
|
|
||||||
|
use cloudevents::{
|
||||||
|
binding::fe2o3_amqp::{EventMessage}, message::MessageDeserializer, Event, EventBuilder,
|
||||||
|
EventBuilderV10, AttributesReader, event::ExtensionValue,
|
||||||
|
};
|
||||||
|
use fe2o3_amqp::{Connection, Receiver, Sender, Session};
|
||||||
|
use serde_json::{json, from_slice, from_str};
|
||||||
|
|
||||||
|
type BoxError = Box<dyn std::error::Error>;
|
||||||
|
type Result<T> = std::result::Result<T, BoxError>;
|
||||||
|
|
||||||
|
const EXAMPLE_TYPE: &str = "example.test";
|
||||||
|
const EXAMPLE_SOURCE: &str = "localhost";
|
||||||
|
const EXTENSION_NAME: &str = "ext-name";
|
||||||
|
const EXTENSION_VALUE: &str = "AMQP";
|
||||||
|
|
||||||
|
async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> {
|
||||||
|
let event = EventBuilderV10::new()
|
||||||
|
.id(i.to_string())
|
||||||
|
.ty(EXAMPLE_TYPE)
|
||||||
|
.source(EXAMPLE_SOURCE)
|
||||||
|
.extension(EXTENSION_NAME, EXTENSION_VALUE)
|
||||||
|
.data("application/json", value)
|
||||||
|
.build()?;
|
||||||
|
let event_message = EventMessage::from_binary_event(event)?;
|
||||||
|
sender.send(event_message).await?.accepted_or("not accepted")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> {
|
||||||
|
let event = EventBuilderV10::new()
|
||||||
|
.id(i.to_string())
|
||||||
|
.ty(EXAMPLE_TYPE)
|
||||||
|
.source(EXAMPLE_SOURCE)
|
||||||
|
.extension(EXTENSION_NAME, EXTENSION_VALUE)
|
||||||
|
.data("application/json", value)
|
||||||
|
.build()?;
|
||||||
|
let event_message = EventMessage::from_structured_event(event)?;
|
||||||
|
sender.send(event_message).await?.accepted_or("not accepted")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recv_event(receiver: &mut Receiver) -> Result<Event> {
|
||||||
|
let delivery = receiver.recv().await?;
|
||||||
|
receiver.accept(&delivery).await?;
|
||||||
|
|
||||||
|
let event_message = EventMessage::from(delivery.into_message());
|
||||||
|
let event = MessageDeserializer::into_event(event_message)?;
|
||||||
|
Ok(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn convert_data_into_json_value(data: &cloudevents::Data) -> Result<serde_json::Value> {
|
||||||
|
let value = match data {
|
||||||
|
cloudevents::Data::Binary(bytes) => from_slice(bytes)?,
|
||||||
|
cloudevents::Data::String(s) => from_str(s)?,
|
||||||
|
cloudevents::Data::Json(value) => value.clone(),
|
||||||
|
};
|
||||||
|
Ok(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let mut connection =
|
||||||
|
Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let mut session = Session::begin(&mut connection).await.unwrap();
|
||||||
|
let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
|
||||||
|
let mut receiver = Receiver::attach(&mut session, "receiver", "q1")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let expected = json!({"hello": "world"});
|
||||||
|
|
||||||
|
// Binary content mode
|
||||||
|
send_binary_event(&mut sender, 1, expected.clone()).await.unwrap();
|
||||||
|
let event = recv_event(&mut receiver).await.unwrap();
|
||||||
|
let value = convert_data_into_json_value(event.data().unwrap()).unwrap();
|
||||||
|
assert_eq!(event.id(), "1");
|
||||||
|
assert_eq!(event.ty(), EXAMPLE_TYPE);
|
||||||
|
assert_eq!(event.source(), EXAMPLE_SOURCE);
|
||||||
|
match event.extension(EXTENSION_NAME).unwrap() {
|
||||||
|
ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE),
|
||||||
|
_ => panic!("Expect a String"),
|
||||||
|
}
|
||||||
|
assert_eq!(value, expected);
|
||||||
|
|
||||||
|
// Structured content mode
|
||||||
|
send_structured_event(&mut sender, 2, expected.clone()).await.unwrap();
|
||||||
|
let event = recv_event(&mut receiver).await.unwrap();
|
||||||
|
let value = convert_data_into_json_value(event.data().unwrap()).unwrap();
|
||||||
|
assert_eq!(event.id(), "2");
|
||||||
|
assert_eq!(event.ty(), EXAMPLE_TYPE);
|
||||||
|
assert_eq!(event.source(), EXAMPLE_SOURCE);
|
||||||
|
match event.extension(EXTENSION_NAME).unwrap() {
|
||||||
|
ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE),
|
||||||
|
_ => panic!("Expect a String"),
|
||||||
|
}
|
||||||
|
assert_eq!(value, expected);
|
||||||
|
|
||||||
|
sender.close().await.unwrap();
|
||||||
|
receiver.close().await.unwrap();
|
||||||
|
session.end().await.unwrap();
|
||||||
|
connection.close().await.unwrap();
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
// Required
|
||||||
|
pub(super) const ID: &str = "id";
|
||||||
|
pub(super) const SOURCE: &str = "source";
|
||||||
|
pub(super) const SPECVERSION: &str = "specversion";
|
||||||
|
pub(super) const TYPE: &str = "type";
|
||||||
|
|
||||||
|
// Optional
|
||||||
|
pub(super) const DATACONTENTTYPE: &str = "datacontenttype";
|
||||||
|
pub(super) const DATASCHEMA: &str = "dataschema";
|
||||||
|
pub(super) const SUBJECT: &str = "subject";
|
||||||
|
pub(super) const TIME: &str = "time";
|
||||||
|
|
||||||
|
pub(super) mod prefixed {
|
||||||
|
// Required
|
||||||
|
pub const ID: &str = "cloudEvents:id";
|
||||||
|
pub const SOURCE: &str = "cloudEvents:source";
|
||||||
|
pub const SPECVERSION: &str = "cloudEvents:specversion";
|
||||||
|
pub const TYPE: &str = "cloudEvents:type";
|
||||||
|
|
||||||
|
// Optional
|
||||||
|
pub const DATASCHEMA: &str = "cloudEvents:dataschema";
|
||||||
|
pub const SUBJECT: &str = "cloudEvents:subject";
|
||||||
|
pub const TIME: &str = "cloudEvents:time";
|
||||||
|
}
|
|
@ -0,0 +1,104 @@
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
use fe2o3_amqp_types::primitives::{SimpleValue, Symbol};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
binding::CLOUDEVENTS_JSON_HEADER,
|
||||||
|
event::SpecVersion,
|
||||||
|
message::{
|
||||||
|
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
|
||||||
|
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
constants::{prefixed, DATACONTENTTYPE},
|
||||||
|
EventMessage, ATTRIBUTE_PREFIX,
|
||||||
|
};
|
||||||
|
|
||||||
|
impl BinaryDeserializer for EventMessage {
|
||||||
|
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
|
||||||
|
mut self,
|
||||||
|
mut serializer: V,
|
||||||
|
) -> Result<R> {
|
||||||
|
use fe2o3_amqp_types::messaging::Body;
|
||||||
|
|
||||||
|
// specversion
|
||||||
|
let spec_version = {
|
||||||
|
let value = self
|
||||||
|
.application_properties
|
||||||
|
.as_mut()
|
||||||
|
.ok_or(Error::WrongEncoding {})?
|
||||||
|
.remove(prefixed::SPECVERSION)
|
||||||
|
.ok_or(Error::WrongEncoding {})
|
||||||
|
.map(|val| match val {
|
||||||
|
SimpleValue::String(s) => Ok(s),
|
||||||
|
_ => Err(Error::WrongEncoding {}),
|
||||||
|
})??;
|
||||||
|
SpecVersion::try_from(&value[..])?
|
||||||
|
};
|
||||||
|
serializer = serializer.set_spec_version(spec_version.clone())?;
|
||||||
|
|
||||||
|
// datacontenttype
|
||||||
|
serializer = match self.content_type {
|
||||||
|
Some(Symbol(content_type)) => serializer
|
||||||
|
.set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?,
|
||||||
|
None => serializer,
|
||||||
|
};
|
||||||
|
|
||||||
|
// remaining attributes
|
||||||
|
let attributes = spec_version.attribute_names();
|
||||||
|
|
||||||
|
if let Some(application_properties) = self.application_properties {
|
||||||
|
for (key, value) in application_properties.0.into_iter() {
|
||||||
|
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
|
||||||
|
if attributes.contains(&key) {
|
||||||
|
let value = MessageAttributeValue::try_from((key, value))?;
|
||||||
|
serializer = serializer.set_attribute(key, value)?;
|
||||||
|
} else {
|
||||||
|
let value = MessageAttributeValue::try_from(value)?;
|
||||||
|
serializer = serializer.set_extension(key, value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.body {
|
||||||
|
Body::Data(data) => {
|
||||||
|
let bytes = data.0.into_vec();
|
||||||
|
serializer.end_with_data(bytes)
|
||||||
|
}
|
||||||
|
Body::Empty => serializer.end(),
|
||||||
|
Body::Sequence(_) | Body::Value(_) => Err(Error::WrongEncoding {}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StructuredDeserializer for EventMessage {
|
||||||
|
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
|
||||||
|
self,
|
||||||
|
serializer: V,
|
||||||
|
) -> Result<R> {
|
||||||
|
use fe2o3_amqp_types::messaging::Body;
|
||||||
|
let bytes = match self.body {
|
||||||
|
Body::Data(data) => data.0.into_vec(),
|
||||||
|
Body::Empty => vec![],
|
||||||
|
Body::Sequence(_) | Body::Value(_) => return Err(Error::WrongEncoding {}),
|
||||||
|
};
|
||||||
|
serializer.set_structured_event(bytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageDeserializer for EventMessage {
|
||||||
|
fn encoding(&self) -> Encoding {
|
||||||
|
match self
|
||||||
|
.content_type
|
||||||
|
.as_ref()
|
||||||
|
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
|
||||||
|
{
|
||||||
|
Some(true) => Encoding::STRUCTURED,
|
||||||
|
Some(false) => Encoding::BINARY,
|
||||||
|
None => Encoding::UNKNOWN,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,360 @@
|
||||||
|
//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with
|
||||||
|
//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents
|
||||||
|
//!
|
||||||
|
//! To send CloudEvents
|
||||||
|
//!
|
||||||
|
//! ```rust
|
||||||
|
//! use serde_json::json;
|
||||||
|
//! use fe2o3_amqp::{Connection, Sender, Session};
|
||||||
|
//! use cloudevents::{
|
||||||
|
//! EventBuilder, EventBuilderV10,
|
||||||
|
//! binding::fe2o3_amqp::{EventMessage, AmqpMessage}
|
||||||
|
//! };
|
||||||
|
//!
|
||||||
|
//! // You need a running AMQP 1.0 broker to try out this example.
|
||||||
|
//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
|
||||||
|
//!
|
||||||
|
//! # async fn send_event() {
|
||||||
|
//! let mut connection =
|
||||||
|
//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
|
||||||
|
//! .await
|
||||||
|
//! .unwrap();
|
||||||
|
//! let mut session = Session::begin(&mut connection).await.unwrap();
|
||||||
|
//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
|
||||||
|
//!
|
||||||
|
//! let event = EventBuilderV10::new()
|
||||||
|
//! .id("1")
|
||||||
|
//! .ty("example.test")
|
||||||
|
//! .source("localhost")
|
||||||
|
//! .extension("ext-name", "AMQP")
|
||||||
|
//! .data("application/json", json!({"hello": "world"}))
|
||||||
|
//! .build()
|
||||||
|
//! .unwrap();
|
||||||
|
//!
|
||||||
|
//! let event_message = EventMessage::from_binary_event(event).unwrap();
|
||||||
|
//! let message = AmqpMessage::from(event_message);
|
||||||
|
//! sender.send(message).await.unwrap()
|
||||||
|
//! .accepted_or("not accepted").unwrap();
|
||||||
|
//!
|
||||||
|
//! sender.close().await.unwrap();
|
||||||
|
//! session.end().await.unwrap();
|
||||||
|
//! connection.close().await.unwrap();
|
||||||
|
//! # }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! To receiver CloudEvents
|
||||||
|
//!
|
||||||
|
//! ```rust
|
||||||
|
//! use fe2o3_amqp::{Connection, Receiver, Session};
|
||||||
|
//! use cloudevents::{
|
||||||
|
//! EventBuilderV10, message::MessageDeserializer,
|
||||||
|
//! binding::fe2o3_amqp::{EventMessage, AmqpMessage}
|
||||||
|
//! };
|
||||||
|
//!
|
||||||
|
//! // You need a running AMQP 1.0 broker to try out this example.
|
||||||
|
//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
|
||||||
|
//!
|
||||||
|
//! # async fn receive_event() {
|
||||||
|
//! let mut connection =
|
||||||
|
//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
|
||||||
|
//! .await
|
||||||
|
//! .unwrap();
|
||||||
|
//! let mut session = Session::begin(&mut connection).await.unwrap();
|
||||||
|
//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap();
|
||||||
|
//!
|
||||||
|
//! let delivery = receiver.recv().await.unwrap();
|
||||||
|
//! receiver.accept(&delivery).await.unwrap();
|
||||||
|
//!
|
||||||
|
//! let message: AmqpMessage = delivery.into_message();
|
||||||
|
//! let event_message = EventMessage::from(message);
|
||||||
|
//! let event = MessageDeserializer::into_event(event_message).unwrap();
|
||||||
|
//!
|
||||||
|
//! receiver.close().await.unwrap();
|
||||||
|
//! session.end().await.unwrap();
|
||||||
|
//! connection.close().await.unwrap();
|
||||||
|
//! # }
|
||||||
|
//! ```
|
||||||
|
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
use chrono::{TimeZone, Utc};
|
||||||
|
use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties};
|
||||||
|
use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
|
||||||
|
|
||||||
|
use crate::event::AttributeValue;
|
||||||
|
use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer};
|
||||||
|
use crate::Event;
|
||||||
|
|
||||||
|
use self::constants::{
|
||||||
|
prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE,
|
||||||
|
};
|
||||||
|
|
||||||
|
const ATTRIBUTE_PREFIX: &str = "cloudEvents:";
|
||||||
|
|
||||||
|
pub mod deserializer;
|
||||||
|
pub mod serializer;
|
||||||
|
|
||||||
|
mod constants;
|
||||||
|
|
||||||
|
/// Type alias for an AMQP 1.0 message
|
||||||
|
///
|
||||||
|
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
|
||||||
|
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
|
||||||
|
/// convenience, this type alias chooses `Value` as the value of the generic parameter
|
||||||
|
pub type AmqpMessage = Message<Value>;
|
||||||
|
|
||||||
|
/// Type alias for an AMQP 1.0 Body
|
||||||
|
///
|
||||||
|
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
|
||||||
|
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
|
||||||
|
/// convenience, this type alias chooses `Value` as the value of the generic parameter
|
||||||
|
pub type AmqpBody = Body<Value>;
|
||||||
|
|
||||||
|
/// This struct contains the necessary fields required for AMQP 1.0 binding.
|
||||||
|
/// It provides conversion between [`Event`] and [`AmqpMessage`]
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ## [`Event`] -> [`AmqpMessage`] in binary content mode
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use serde_json::json;
|
||||||
|
/// use fe2o3_amqp_types::messaging::Message;
|
||||||
|
/// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage};
|
||||||
|
///
|
||||||
|
/// let event = EventBuilderV10::new()
|
||||||
|
/// .id("1")
|
||||||
|
/// .ty("example.test")
|
||||||
|
/// .source("localhost")
|
||||||
|
/// .extension("ext-name", "AMQP")
|
||||||
|
/// .data("application/json", json!({"hello": "world"}))
|
||||||
|
/// .build()
|
||||||
|
/// .unwrap();
|
||||||
|
/// let event_message = EventMessage::from_binary_event(event).unwrap();
|
||||||
|
/// let amqp_message = Message::from(event_message);
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// ## [`Event`] -> [`AmqpMessage`] in structured content mode
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use serde_json::json;
|
||||||
|
/// use fe2o3_amqp_types::messaging::Message;
|
||||||
|
/// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage};
|
||||||
|
///
|
||||||
|
/// let event = EventBuilderV10::new()
|
||||||
|
/// .id("1")
|
||||||
|
/// .ty("example.test")
|
||||||
|
/// .source("localhost")
|
||||||
|
/// .extension("ext-name", "AMQP")
|
||||||
|
/// .data("application/json", json!({"hello": "world"}))
|
||||||
|
/// .build()
|
||||||
|
/// .unwrap();
|
||||||
|
/// let event_message = EventMessage::from_structured_event(event).unwrap();
|
||||||
|
/// let amqp_message = Message::from(event_message);
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// ## [`AmqpMessage`] -> [`Event`]
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use fe2o3_amqp::Receiver;
|
||||||
|
/// use cloudevents::{
|
||||||
|
/// message::MessageDeserializer,
|
||||||
|
/// binding::fe2o3_amqp::{AmqpMessage, EventMessage}
|
||||||
|
/// };
|
||||||
|
///
|
||||||
|
/// # async fn receive_event(receiver: &mut Receiver) {
|
||||||
|
/// let delivery = receiver.recv().await.unwrap();
|
||||||
|
/// receiver.accept(&delivery).await.unwrap();
|
||||||
|
/// let amqp_message: AmqpMessage = delivery.into_message();
|
||||||
|
/// let event_message = EventMessage::from(amqp_message);
|
||||||
|
/// let event = MessageDeserializer::into_event(event_message).unwrap();
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
pub struct EventMessage {
|
||||||
|
pub content_type: Option<Symbol>,
|
||||||
|
pub application_properties: Option<ApplicationProperties>,
|
||||||
|
pub body: AmqpBody,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventMessage {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
content_type: None,
|
||||||
|
application_properties: None,
|
||||||
|
body: Body::Empty,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an [`EventMessage`] from an event using a binary serializer
|
||||||
|
pub fn from_binary_event(event: Event) -> Result<Self, Error> {
|
||||||
|
BinaryDeserializer::deserialize_binary(event, Self::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an [`EventMessage`] from an event using a structured serializer
|
||||||
|
pub fn from_structured_event(event: Event) -> Result<Self, Error> {
|
||||||
|
StructuredDeserializer::deserialize_structured(event, Self::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<EventMessage> for AmqpMessage {
|
||||||
|
fn from(event: EventMessage) -> Self {
|
||||||
|
let properties = Properties {
|
||||||
|
content_type: event.content_type,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
Message {
|
||||||
|
header: None,
|
||||||
|
delivery_annotations: None,
|
||||||
|
message_annotations: None,
|
||||||
|
properties: Some(properties),
|
||||||
|
application_properties: event.application_properties,
|
||||||
|
body: event.body,
|
||||||
|
footer: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<AmqpMessage> for EventMessage {
|
||||||
|
fn from(message: AmqpMessage) -> Self {
|
||||||
|
let content_type = message.properties.and_then(|p| p.content_type);
|
||||||
|
Self {
|
||||||
|
content_type,
|
||||||
|
application_properties: message.application_properties,
|
||||||
|
body: message.body,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<AttributeValue<'a>> for SimpleValue {
|
||||||
|
fn from(value: AttributeValue) -> Self {
|
||||||
|
match value {
|
||||||
|
AttributeValue::SpecVersion(spec_ver) => {
|
||||||
|
SimpleValue::String(String::from(spec_ver.as_str()))
|
||||||
|
}
|
||||||
|
AttributeValue::String(s) => SimpleValue::String(String::from(s)),
|
||||||
|
AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())),
|
||||||
|
AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()),
|
||||||
|
AttributeValue::Boolean(val) => SimpleValue::Bool(*val),
|
||||||
|
AttributeValue::Integer(val) => SimpleValue::Long(*val),
|
||||||
|
AttributeValue::Time(datetime) => {
|
||||||
|
let millis = datetime.timestamp_millis();
|
||||||
|
let timestamp = Timestamp::from_milliseconds(millis);
|
||||||
|
SimpleValue::Timestamp(timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<AttributeValue<'a>> for Value {
|
||||||
|
fn from(value: AttributeValue) -> Self {
|
||||||
|
match value {
|
||||||
|
AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())),
|
||||||
|
AttributeValue::String(s) => Value::String(String::from(s)),
|
||||||
|
AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())),
|
||||||
|
AttributeValue::URIRef(uri) => Value::String(uri.clone()),
|
||||||
|
AttributeValue::Boolean(val) => Value::Bool(*val),
|
||||||
|
AttributeValue::Integer(val) => Value::Long(*val),
|
||||||
|
AttributeValue::Time(datetime) => {
|
||||||
|
let millis = datetime.timestamp_millis();
|
||||||
|
let timestamp = Timestamp::from_milliseconds(millis);
|
||||||
|
Value::Timestamp(timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<MessageAttributeValue> for SimpleValue {
|
||||||
|
fn from(value: MessageAttributeValue) -> Self {
|
||||||
|
match value {
|
||||||
|
MessageAttributeValue::String(s) => SimpleValue::String(s),
|
||||||
|
MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())),
|
||||||
|
MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri),
|
||||||
|
MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val),
|
||||||
|
MessageAttributeValue::Integer(val) => SimpleValue::Long(val),
|
||||||
|
MessageAttributeValue::DateTime(datetime) => {
|
||||||
|
let millis = datetime.timestamp_millis();
|
||||||
|
let timestamp = Timestamp::from_milliseconds(millis);
|
||||||
|
SimpleValue::Timestamp(timestamp)
|
||||||
|
}
|
||||||
|
MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<MessageAttributeValue> for Value {
|
||||||
|
fn from(value: MessageAttributeValue) -> Self {
|
||||||
|
match value {
|
||||||
|
MessageAttributeValue::String(s) => Value::String(s),
|
||||||
|
MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())),
|
||||||
|
MessageAttributeValue::UriRef(uri) => Value::String(uri),
|
||||||
|
MessageAttributeValue::Boolean(val) => Value::Bool(val),
|
||||||
|
MessageAttributeValue::Integer(val) => Value::Long(val),
|
||||||
|
MessageAttributeValue::DateTime(datetime) => {
|
||||||
|
let millis = datetime.timestamp_millis();
|
||||||
|
let timestamp = Timestamp::from_milliseconds(millis);
|
||||||
|
Value::Timestamp(timestamp)
|
||||||
|
}
|
||||||
|
MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<SimpleValue> for MessageAttributeValue {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn try_from(value: SimpleValue) -> Result<Self, Self::Error> {
|
||||||
|
match value {
|
||||||
|
SimpleValue::Bool(val) => Ok(MessageAttributeValue::Boolean(val)),
|
||||||
|
SimpleValue::Long(val) => Ok(MessageAttributeValue::Integer(val)),
|
||||||
|
SimpleValue::Timestamp(val) => {
|
||||||
|
let datetime = Utc.timestamp_millis(val.into_inner());
|
||||||
|
Ok(MessageAttributeValue::DateTime(datetime))
|
||||||
|
}
|
||||||
|
SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())),
|
||||||
|
SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)),
|
||||||
|
_ => Err(Error::WrongEncoding {}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn try_from((key, value): (&'a str, SimpleValue)) -> Result<Self, Self::Error> {
|
||||||
|
match key {
|
||||||
|
// String
|
||||||
|
ID | prefixed::ID
|
||||||
|
// String
|
||||||
|
| SPECVERSION | prefixed::SPECVERSION
|
||||||
|
// String
|
||||||
|
| TYPE | prefixed::TYPE
|
||||||
|
// String
|
||||||
|
| DATACONTENTTYPE
|
||||||
|
// String
|
||||||
|
| SUBJECT | prefixed::SUBJECT => {
|
||||||
|
let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?;
|
||||||
|
Ok(MessageAttributeValue::String(val))
|
||||||
|
},
|
||||||
|
// URI-reference
|
||||||
|
SOURCE | prefixed::SOURCE => {
|
||||||
|
let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?;
|
||||||
|
Ok(MessageAttributeValue::UriRef(val))
|
||||||
|
},
|
||||||
|
// URI
|
||||||
|
DATASCHEMA | prefixed::DATASCHEMA => {
|
||||||
|
let val = String::try_from(value).map_err(|_| Error::WrongEncoding { })?;
|
||||||
|
let url_val = url::Url::parse(&val)?;
|
||||||
|
Ok(MessageAttributeValue::Uri(url_val))
|
||||||
|
}
|
||||||
|
// Timestamp
|
||||||
|
TIME | prefixed::TIME => {
|
||||||
|
let val = Timestamp::try_from(value).map_err(|_| Error::WrongEncoding { })?;
|
||||||
|
let datetime = Utc.timestamp_millis(val.into_inner());
|
||||||
|
Ok(MessageAttributeValue::DateTime(datetime))
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
MessageAttributeValue::try_from(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
use fe2o3_amqp_types::messaging::{ApplicationProperties, Data as AmqpData};
|
||||||
|
use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol};
|
||||||
|
|
||||||
|
use crate::binding::header_prefix;
|
||||||
|
use crate::message::StructuredSerializer;
|
||||||
|
use crate::{
|
||||||
|
event::SpecVersion,
|
||||||
|
message::{BinarySerializer, Error, MessageAttributeValue},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::constants::DATACONTENTTYPE;
|
||||||
|
use super::{AmqpBody, EventMessage, ATTRIBUTE_PREFIX};
|
||||||
|
|
||||||
|
impl BinarySerializer<EventMessage> for EventMessage {
|
||||||
|
fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result<Self> {
|
||||||
|
let key = String::from("cloudEvents:specversion");
|
||||||
|
let value = String::from(spec_version.as_str());
|
||||||
|
self.application_properties
|
||||||
|
.get_or_insert(ApplicationProperties::default())
|
||||||
|
.insert(key, SimpleValue::from(value));
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_attribute(
|
||||||
|
mut self,
|
||||||
|
name: &str,
|
||||||
|
value: MessageAttributeValue,
|
||||||
|
) -> crate::message::Result<Self> {
|
||||||
|
// For the binary mode, the AMQP content-type property field value maps directly to the
|
||||||
|
// CloudEvents datacontenttype attribute.
|
||||||
|
//
|
||||||
|
// All CloudEvents attributes with exception of datacontenttype MUST be individually mapped
|
||||||
|
// to and from the AMQP application-properties section.
|
||||||
|
if name == DATACONTENTTYPE {
|
||||||
|
self.content_type = match value {
|
||||||
|
MessageAttributeValue::String(s) => Some(Symbol::from(s)),
|
||||||
|
_ => return Err(Error::WrongEncoding {}),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// CloudEvent attributes are prefixed with "cloudEvents:" for use in the
|
||||||
|
// application-properties section
|
||||||
|
let key = header_prefix(ATTRIBUTE_PREFIX, name);
|
||||||
|
let value = SimpleValue::from(value);
|
||||||
|
self.application_properties
|
||||||
|
.get_or_insert(ApplicationProperties::default())
|
||||||
|
.insert(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extension attributes are always serialized according to binding rules like standard
|
||||||
|
// attributes. However this specification does not prevent an extension from copying event
|
||||||
|
// attribute values to other parts of a message, in order to interact with non-CloudEvents
|
||||||
|
// systems that also process the message. Extension specifications that do this SHOULD specify
|
||||||
|
// how receivers are to interpret messages if the copied values differ from the cloud-event
|
||||||
|
// serialized values.
|
||||||
|
fn set_extension(
|
||||||
|
mut self,
|
||||||
|
name: &str,
|
||||||
|
value: MessageAttributeValue,
|
||||||
|
) -> crate::message::Result<Self> {
|
||||||
|
let key = header_prefix(ATTRIBUTE_PREFIX, name);
|
||||||
|
let value = SimpleValue::from(value);
|
||||||
|
self.application_properties
|
||||||
|
.get_or_insert(ApplicationProperties::default())
|
||||||
|
.insert(key, value);
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end_with_data(mut self, bytes: Vec<u8>) -> crate::message::Result<Self> {
|
||||||
|
let data = Binary::from(bytes);
|
||||||
|
self.body = AmqpBody::Data(AmqpData(data));
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end(self) -> crate::message::Result<Self> {
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StructuredSerializer<EventMessage> for EventMessage {
|
||||||
|
fn set_structured_event(mut self, bytes: Vec<u8>) -> crate::message::Result<Self> {
|
||||||
|
self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8"));
|
||||||
|
self.body = AmqpBody::Data(AmqpData(Binary::from(bytes)));
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,6 +6,9 @@ pub mod actix;
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "axum")))]
|
#[cfg_attr(docsrs, doc(cfg(feature = "axum")))]
|
||||||
#[cfg(feature = "axum")]
|
#[cfg(feature = "axum")]
|
||||||
pub mod axum;
|
pub mod axum;
|
||||||
|
#[cfg_attr(docsrs, doc(cfg(feature = "fe2o3-amqp")))]
|
||||||
|
#[cfg(feature = "fe2o3-amqp")]
|
||||||
|
pub mod fe2o3_amqp;
|
||||||
|
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
docsrs,
|
docsrs,
|
||||||
|
|
Loading…
Reference in New Issue