From 1188f17aea27c0e2c8889a80f7a4b284ffe942f5 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:08:21 -0700 Subject: [PATCH] added assertions in example Signed-off-by: minghuaw --- .../fe2o3-amqp-example/src/main.rs | 67 ++++++++++++++++--- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index a639b29..286397c 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -5,7 +5,7 @@ use cloudevents::{ binding::fe2o3_amqp::EventMessage, message::MessageDeserializer, Event, EventBuilder, - EventBuilderV10, + EventBuilderV10, AttributesReader, event::ExtensionValue, }; use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session}; use serde_json::{json, from_slice, from_str}; @@ -13,7 +13,26 @@ use serde_json::{json, from_slice, from_str}; type BoxError = Box; type Result = std::result::Result; -async fn send_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { +const EXAMPLE_TYPE: &str = "example.test"; +const EXAMPLE_SOURCE: &str = "localhost"; +const EXTENSION_NAME: &str = "ext-name"; +const EXTENSION_VALUE: &str = "AMQP"; + +async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { + let event = EventBuilderV10::new() + .id(i.to_string()) + .ty(EXAMPLE_TYPE) + .source(EXAMPLE_SOURCE) + .extension(EXTENSION_NAME, EXTENSION_VALUE) + .data("application/json", value) + .build()?; + let event_message = EventMessage::from_binary_event(event)?; + let message = Message::from(event_message); + sender.send(message).await?.accepted_or("not accepted")?; + Ok(()) +} + +async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { let event = EventBuilderV10::new() .id(i.to_string()) .ty("example.test") @@ -21,7 +40,7 @@ async fn send_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> .extension("ext-name", "AMQP") .data("application/json", value) .build()?; - let event_message = EventMessage::from_binary_event(event)?; + let event_message = EventMessage::from_structured_event(event)?; let message = Message::from(event_message); sender.send(message).await?.accepted_or("not accepted")?; Ok(()) @@ -36,6 +55,15 @@ async fn recv_event(receiver: &mut Receiver) -> Result { Ok(event) } +fn convert_data_into_json_value(data: &cloudevents::Data) -> Result { + let value = match data { + cloudevents::Data::Binary(bytes) => from_slice(bytes)?, + cloudevents::Data::String(s) => from_str(s)?, + cloudevents::Data::Json(value) => value.clone(), + }; + Ok(value) +} + #[tokio::main] async fn main() { let mut connection = @@ -49,15 +77,32 @@ async fn main() { .unwrap(); let expected = json!({"hello": "world"}); - send_event(&mut sender, 1, expected.clone()).await.unwrap(); - let event = recv_event(&mut receiver).await.unwrap(); - let data: serde_json::Value = match event.data().unwrap() { - cloudevents::Data::Binary(bytes) => from_slice(bytes).unwrap(), - cloudevents::Data::String(s) => from_str(s).unwrap(), - cloudevents::Data::Json(value) => value.clone(), - }; - assert_eq!(data, expected); + // Binary content mode + send_binary_event(&mut sender, 1, expected.clone()).await.unwrap(); + let event = recv_event(&mut receiver).await.unwrap(); + let value = convert_data_into_json_value(event.data().unwrap()).unwrap(); + assert_eq!(event.id(), "1"); + assert_eq!(event.ty(), EXAMPLE_TYPE); + assert_eq!(event.source(), EXAMPLE_SOURCE); + match event.extension(EXTENSION_NAME).unwrap() { + ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE), + _ => panic!("Expect a String"), + } + assert_eq!(value, expected); + + // Structured content mode + send_structured_event(&mut sender, 2, expected.clone()).await.unwrap(); + let event = recv_event(&mut receiver).await.unwrap(); + let value = convert_data_into_json_value(event.data().unwrap()).unwrap(); + assert_eq!(event.id(), "2"); + assert_eq!(event.ty(), EXAMPLE_TYPE); + assert_eq!(event.source(), EXAMPLE_SOURCE); + match event.extension(EXTENSION_NAME).unwrap() { + ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE), + _ => panic!("Expect a String"), + } + assert_eq!(value, expected); sender.close().await.unwrap(); receiver.close().await.unwrap();