Use the new iter() fn for message headers

Signed-off-by: Jim Crossley <jim@crossleys.org>
This commit is contained in:
Jim Crossley 2022-11-04 09:40:02 -04:00
parent f4c85cd335
commit 991cdbd01f
1 changed files with 6 additions and 9 deletions

View File

@ -20,16 +20,13 @@ pub struct ConsumerRecordDeserializer {
impl ConsumerRecordDeserializer {
fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
let mut hm = HashMap::new();
let headers = message
.headers()
// TODO create an error variant for invalid headers
.ok_or(crate::message::Error::WrongEncoding {})?;
for i in 0..headers.count() {
let header = headers.get(i);
hm.insert(header.key.to_string(), Vec::from(header.value.unwrap()));
match message.headers() {
None => Err(crate::message::Error::WrongEncoding {}),
Some(headers) => Ok(headers
.iter()
.map(|h| (h.key.to_string(), Vec::from(h.value.unwrap())))
.collect()),
}
Ok(hm)
}
pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {