Bump to latest rdkafka 0.29

Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
Jim Crossley 2022-11-03 12:47:30 -04:00
parent c380078bf4
commit 3c122783ac
4 changed files with 24 additions and 13 deletions

View File

@ -46,7 +46,7 @@ uuid = { version = "1", features = ["v4"] }
actix-web = { version = "4", optional = true }
actix-http = { version = "3", optional = true }
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
rdkafka-lib = { version = "^0.28", features = ["cmake-build"], optional = true, package = "rdkafka" }
rdkafka-lib = { version = "^0.29", features = ["cmake-build"], optional = true, package = "rdkafka" }
warp-lib = { version = "^0.3", optional = true, package = "warp" }
async-trait = { version = "^0.1.33", optional = true }
bytes = { version = "^1.0", optional = true }

View File

@ -26,8 +26,8 @@ impl ConsumerRecordDeserializer {
// TODO create an error variant for invalid headers
.ok_or(crate::message::Error::WrongEncoding {})?;
for i in 0..headers.count() {
let header = headers.get(i).unwrap();
hm.insert(header.0.to_string(), Vec::from(header.1));
let header = headers.get(i);
hm.insert(header.key.to_string(), Vec::from(header.value.unwrap()));
}
Ok(hm)
}

View File

@ -9,7 +9,7 @@ use crate::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use rdkafka::message::{OwnedHeaders, ToBytes};
use rdkafka::message::{Header, OwnedHeaders, ToBytes};
use rdkafka::producer::{BaseRecord, FutureRecord};
/// This struct contains a serialized CloudEvent message in the Kafka shape.
@ -46,20 +46,27 @@ impl Default for MessageRecord {
impl BinarySerializer<MessageRecord> for MessageRecord {
fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
self.headers = self.headers.add(SPEC_VERSION_HEADER, &sv.to_string());
let v = sv.to_string();
let header = Header {
key: SPEC_VERSION_HEADER,
value: Some(&v),
};
self.headers = self.headers.insert(header);
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self.headers = self.headers.add(key, &value.to_string());
let v = value.to_string();
let header = Header {
key: &header_prefix(name),
value: Some(&v),
};
self.headers = self.headers.insert(header);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self.headers = self.headers.add(key, &value.to_string());
Ok(self)
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.set_attribute(name, value)
}
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
@ -74,7 +81,11 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
impl StructuredSerializer<MessageRecord> for MessageRecord {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
self.headers = self.headers.add(CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER);
let header = Header {
key: CONTENT_TYPE,
value: Some(CLOUDEVENTS_JSON_HEADER),
};
self.headers = self.headers.insert(header);
self.payload = Some(bytes);

View File

@ -33,7 +33,7 @@
//! use futures::StreamExt;
//!
//! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) -> Result<(), Box<dyn std::error::Error>> {
//! let mut message_stream = consumer.start();
//! let mut message_stream = consumer.stream();
//!
//! while let Some(message) = message_stream.next().await {
//! match message {