From c11ca15fc49c09d06a8869f47b94e70bfd492284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Jos=C3=A9?= Date: Thu, 19 Sep 2019 21:30:04 -0300 Subject: [PATCH] Kafka header mapper for (un)marshalling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio José --- .../cloudevents/v03/kafka/HeaderMapper.java | 69 ++++++++ .../v03/kafka/HeaderMapperTest.java | 148 ++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 kafka/src/main/java/io/cloudevents/v03/kafka/HeaderMapper.java create mode 100644 kafka/src/test/java/io/cloudevents/v03/kafka/HeaderMapperTest.java diff --git a/kafka/src/main/java/io/cloudevents/v03/kafka/HeaderMapper.java b/kafka/src/main/java/io/cloudevents/v03/kafka/HeaderMapper.java new file mode 100644 index 00000000..d27c0608 --- /dev/null +++ b/kafka/src/main/java/io/cloudevents/v03/kafka/HeaderMapper.java @@ -0,0 +1,69 @@ +package io.cloudevents.v03.kafka; + +import static io.cloudevents.v03.kafka.AttributeMapper.HEADER_PREFIX; + +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.AbstractMap.SimpleEntry; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; + +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.fun.FormatHeaderMapper; +import io.cloudevents.v03.AttributesImpl; + +/** + * + * @author fabiojose + * @version 0.3 + */ +public class HeaderMapper { + private HeaderMapper() {} + + private static final Serializer SERIALIZER = + Serdes.String().serializer(); + + /** + * Following the signature of {@link FormatHeaderMapper} + * @param attributes The map of attributes created by + * {@link AttributesImpl#marshal(AttributesImpl)} + * @param extensions The map of extensions created by + * {@link ExtensionFormat#marshal(java.util.Collection)} + * @return The map of Kafka Headers with values as {@code byte[]} + */ + public static Map map(Map attributes, + Map extensions) { + Objects.requireNonNull(attributes); + Objects.requireNonNull(extensions); + + Map result = attributes.entrySet() + .stream() + .filter(attribute -> null!= attribute.getValue()) + .map(attribute -> + new SimpleEntry<>(attribute.getKey() + .toLowerCase(Locale.US), attribute.getValue())) + .map(attribute -> + new SimpleEntry<>(HEADER_PREFIX+attribute.getKey(), + attribute.getValue())) + .map(attribute -> + new SimpleEntry<>(attribute.getKey(), + SERIALIZER.serialize(null, attribute.getValue()))) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + + result.putAll( + extensions.entrySet() + .stream() + .filter(extension -> null!= extension.getValue()) + .map(extension -> + new SimpleEntry<>(extension.getKey(), + SERIALIZER.serialize(null, extension.getValue()))) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)) + ); + + return result; + } +} diff --git a/kafka/src/test/java/io/cloudevents/v03/kafka/HeaderMapperTest.java b/kafka/src/test/java/io/cloudevents/v03/kafka/HeaderMapperTest.java new file mode 100644 index 00000000..ef55375e --- /dev/null +++ b/kafka/src/test/java/io/cloudevents/v03/kafka/HeaderMapperTest.java @@ -0,0 +1,148 @@ +/** + * Copyright 2019 The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudevents.v03.kafka; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.cloudevents.v03.kafka.HeaderMapper; + +/** + * + * @author fabiojose + * + */ +public class HeaderMapperTest { + + @Test + public void error_when_attributes_map_isnull() { + // setup + Map extensions = new HashMap<>(); + + assertThrows(NullPointerException.class, () -> { + // act + HeaderMapper.map(null, extensions); + }); + } + + @Test + public void error_when_extensions_map_isnull() { + // setup + Map attributes = new HashMap<>(); + + assertThrows(NullPointerException.class, () -> { + // act + HeaderMapper.map(attributes, null); + }); + } + + @Test + public void should_not_map_null_attribute_value() { + // setup + Map attributes = new HashMap<>(); + attributes.put("type", null); + attributes.put("specversion", "0.3"); + + Map extensions = new HashMap<>(); + + // act + Map actual = HeaderMapper.map(attributes, extensions); + + //assert + assertFalse(actual.containsKey("ce-type")); + } + + @Test + public void should_not_map_null_extension_value() { + // setup + Map attributes = new HashMap<>(); + attributes.put("type", "mytype"); + attributes.put("specversion", "0.3"); + + Map extensions = new HashMap<>(); + extensions.put("null-ext", null); + extensions.put("comexampleextension1", "value"); + + // act + Map actual = HeaderMapper.map(attributes, extensions); + + //assert + assertFalse(actual.containsKey("null-ext")); + } + + @Test + public void should_not_map_absent_contenttype() { + // setup + Map attributes = new HashMap<>(); + attributes.put("type", "mytype"); + attributes.put("specversion", "0.3"); + + Map extensions = new HashMap<>(); + extensions.put("null-ext", "null-value"); + extensions.put("comexampleextension1", "value"); + + // act + Map actual = HeaderMapper.map(attributes, extensions); + + //assert + assertFalse(actual.containsKey("Content-Type")); + } + + @Test + public void should_map_extension_without_prefix() { + // setup + Map attributes = new HashMap<>(); + attributes.put("type", "mytype"); + attributes.put("specversion", "0.3"); + + Map extensions = new HashMap<>(); + extensions.put("null-ext", "null-value"); + extensions.put("comexampleextension1", "value"); + + // act + Map actual = HeaderMapper.map(attributes, extensions); + + //assert + assertTrue(actual.containsKey("comexampleextension1")); + } + + @Test + public void should_all_values_as_byte_array() { + // setup + Map attributes = new HashMap<>(); + attributes.put("type", "mytype"); + attributes.put("specversion", "0.3"); + + Map extensions = new HashMap<>(); + extensions.put("null-ext", "null-value"); + extensions.put("comexampleextension1", "value"); + + // act + Map actuals = HeaderMapper.map(attributes, extensions); + + // assert + actuals.values() + .forEach(actual -> { + assertTrue(actual instanceof byte[]); + }); + } +}