Bump to latest rdkafka 0.29 (#192)

* Bump to latest rdkafka 0.29

* Updated example

Apps will be required to bump their dep to 0.29, too.

* Using the new iter() fn for message headers
This commit is contained in:
Jim Crossley 2022-11-04 10:32:45 -04:00 committed by GitHub
parent c380078bf4
commit 7c2ff41960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 29 additions and 21 deletions

View File

@ -46,7 +46,7 @@ uuid = { version = "1", features = ["v4"] }
actix-web = { version = "4", optional = true }
actix-http = { version = "3", optional = true }
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
rdkafka-lib = { version = "^0.28", features = ["cmake-build"], optional = true, package = "rdkafka" }
rdkafka-lib = { version = "^0.29", features = ["cmake-build"], optional = true, package = "rdkafka" }
warp-lib = { version = "^0.3", optional = true, package = "warp" }
async-trait = { version = "^0.1.33", optional = true }
bytes = { version = "^1.0", optional = true }

View File

@ -16,4 +16,4 @@ serde_json = "^1.0"
futures = "^0.3"
tokio = { version = "^1.0", features = ["full"] }
clap = "2.33.1"
rdkafka = { version = "^0.28", features = ["cmake-build"] }
rdkafka = { version = "^0.29", features = ["cmake-build"] }

View File

@ -20,16 +20,13 @@ pub struct ConsumerRecordDeserializer {
impl ConsumerRecordDeserializer {
fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
let mut hm = HashMap::new();
let headers = message
.headers()
// TODO create an error variant for invalid headers
.ok_or(crate::message::Error::WrongEncoding {})?;
for i in 0..headers.count() {
let header = headers.get(i).unwrap();
hm.insert(header.0.to_string(), Vec::from(header.1));
match message.headers() {
None => Err(crate::message::Error::WrongEncoding {}),
Some(headers) => Ok(headers
.iter()
.map(|h| (h.key.to_string(), Vec::from(h.value.unwrap())))
.collect()),
}
Ok(hm)
}
pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {

View File

@ -9,7 +9,7 @@ use crate::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use rdkafka::message::{OwnedHeaders, ToBytes};
use rdkafka::message::{Header, OwnedHeaders, ToBytes};
use rdkafka::producer::{BaseRecord, FutureRecord};
/// This struct contains a serialized CloudEvent message in the Kafka shape.
@ -46,20 +46,27 @@ impl Default for MessageRecord {
impl BinarySerializer<MessageRecord> for MessageRecord {
fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
self.headers = self.headers.add(SPEC_VERSION_HEADER, &sv.to_string());
let v = sv.to_string();
let header = Header {
key: SPEC_VERSION_HEADER,
value: Some(&v),
};
self.headers = self.headers.insert(header);
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self.headers = self.headers.add(key, &value.to_string());
let v = value.to_string();
let header = Header {
key: &header_prefix(name),
value: Some(&v),
};
self.headers = self.headers.insert(header);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self.headers = self.headers.add(key, &value.to_string());
Ok(self)
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.set_attribute(name, value)
}
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
@ -74,7 +81,11 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
impl StructuredSerializer<MessageRecord> for MessageRecord {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
self.headers = self.headers.add(CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER);
let header = Header {
key: CONTENT_TYPE,
value: Some(CLOUDEVENTS_JSON_HEADER),
};
self.headers = self.headers.insert(header);
self.payload = Some(bytes);

View File

@ -33,7 +33,7 @@
//! use futures::StreamExt;
//!
//! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) -> Result<(), Box<dyn std::error::Error>> {
//! let mut message_stream = consumer.start();
//! let mut message_stream = consumer.stream();
//!
//! while let Some(message) = message_stream.next().await {
//! match message {