treat missing cloutEvents prefix as extension

Signed-off-by: minghuaw <michael.wu1107@gmail.com>
This commit is contained in:
minghuaw 2022-08-18 03:35:58 -07:00
parent b193012953
commit 55c5c8f81b
2 changed files with 14 additions and 6 deletions

View File

@ -3,8 +3,11 @@
//! You need a running AMQP 1.0 broker to try out this example.
//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
use cloudevents::{binding::fe2o3_amqp::EventMessage, Event, EventBuilderV10, EventBuilder, message::MessageDeserializer};
use fe2o3_amqp::{Connection, Sender, Receiver, types::messaging::Message, Session};
use cloudevents::{
binding::fe2o3_amqp::EventMessage, message::MessageDeserializer, Event, EventBuilder,
EventBuilderV10,
};
use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session};
use serde_json::json;
type BoxError = Box<dyn std::error::Error>;
@ -15,12 +18,12 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> {
.id(i.to_string())
.ty("example.test")
.source("localhost")
.extension("ext-name", "AMQP")
.data("application/json", json!({"hello": "world"}))
.build()?;
let event_message = EventMessage::from_structured_event(event)?;
let event_message = EventMessage::from_binary_event(event)?;
let message = Message::from(event_message);
sender.send(message).await?
.accepted_or("not accepted")?;
sender.send(message).await?.accepted_or("not accepted")?;
Ok(())
}
@ -43,7 +46,9 @@ async fn main() {
.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap();
let mut receiver = Receiver::attach(&mut session, "receiver", "q1")
.await
.unwrap();
send_event(&mut sender, 1).await.unwrap();
let event = recv_event(&mut receiver).await.unwrap();

View File

@ -59,6 +59,9 @@ impl BinaryDeserializer for EventMessage {
let value = MessageAttributeValue::try_from(value)?;
serializer = serializer.set_extension(key, value)?;
}
} else {
let value = MessageAttributeValue::try_from(value)?;
serializer = serializer.set_extension(&key, value)?;
}
}
}