Merge pull request #98 from skpark-tech/master

Map extensions with prefix for Kafka CloudEvent header
This commit is contained in:
Fabio José 2020-03-04 20:14:27 -03:00 committed by GitHub
commit dd07320779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 39 deletions

View File

@ -15,6 +15,7 @@
*/
package io.cloudevents.v1.kafka;
import static io.cloudevents.v1.kafka.HeaderMapper.HEADER_PREFIX;
import static java.util.stream.Collectors.toMap;
import java.util.Locale;
@ -37,9 +38,7 @@ import io.cloudevents.v1.ContextAttributes;
public class AttributeMapper {
private AttributeMapper() {}
static final String HEADER_PREFIX = "ce_";
private static final Deserializer<String> DESERIALIZER =
private static final Deserializer<String> DESERIALIZER =
Serdes.String().deserializer();
private static final String NULL_ARG = null;
@ -65,7 +64,6 @@ public class AttributeMapper {
String key = header.getKey();
key = key.substring(HEADER_PREFIX.length());
String val = DESERIALIZER.deserialize(NULL_ARG,
header.getValue());
return new SimpleEntry<>(key, val);

View File

@ -28,7 +28,8 @@ import org.apache.kafka.common.serialization.Serdes;
import io.cloudevents.fun.FormatExtensionMapper;
import io.cloudevents.v1.ContextAttributes;
import io.cloudevents.v1.kafka.AttributeMapper;
import static io.cloudevents.v1.kafka.HeaderMapper.HEADER_PREFIX;
/**
*
@ -40,8 +41,7 @@ private ExtensionMapper() {}
private static final List<String> RESERVED_HEADERS =
ContextAttributes.VALUES.stream()
.map(attribute -> AttributeMapper
.HEADER_PREFIX + attribute)
.map(attribute -> HEADER_PREFIX + attribute)
.collect(Collectors.toList());
static {
RESERVED_HEADERS.add("content-type");
@ -67,6 +67,7 @@ private ExtensionMapper() {}
.filter(header -> null!= header.getValue())
.map(header -> new SimpleEntry<>(header.getKey()
.toLowerCase(Locale.US), header.getValue()))
.filter(header -> header.getKey().startsWith(HEADER_PREFIX))
.filter(header -> {
return !RESERVED_HEADERS.contains(header.getKey());
})
@ -74,6 +75,7 @@ private ExtensionMapper() {}
(byte[])header.getValue()))
.map(header -> {
String key = header.getKey();
key = key.substring(HEADER_PREFIX.length());
String val = DESERIALIZER.deserialize(NULL_ARG,
header.getValue());
return new SimpleEntry<>(key, val);

View File

@ -1,7 +1,5 @@
package io.cloudevents.v1.kafka;
import static io.cloudevents.v1.kafka.AttributeMapper.HEADER_PREFIX;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
@ -19,23 +17,25 @@ import io.cloudevents.v1.AttributesImpl;
import io.cloudevents.v1.ContextAttributes;
/**
*
*
* @author fabiojose
* @version 1.0
*/
public class HeaderMapper {
private HeaderMapper() {}
private static final Serializer<String> SERIALIZER =
static final String HEADER_PREFIX = "ce_";
private HeaderMapper() {}
private static final Serializer<String> SERIALIZER =
Serdes.String().serializer();
private static final String KAFKA_CONTENT_TYPE = "content-type";
/**
* Following the signature of {@link FormatHeaderMapper}
* @param attributes The map of attributes created by
* @param attributes The map of attributes created by
* {@link AttributesImpl#marshal(AttributesImpl)}
* @param extensions The map of extensions created by
* @param extensions The map of extensions created by
* {@link ExtensionFormat#marshal(java.util.Collection)}
* @return The map of Kafka Headers with values as {@code byte[]}
*/
@ -43,39 +43,39 @@ public class HeaderMapper {
Map<String, String> extensions) {
Objects.requireNonNull(attributes);
Objects.requireNonNull(extensions);
Map<String, byte[]> result = attributes.entrySet()
.stream()
.filter(attribute -> null!= attribute.getValue())
.map(attribute ->
.filter(attribute -> null != attribute.getValue())
.map(attribute ->
new SimpleEntry<>(attribute.getKey()
.toLowerCase(Locale.US), attribute.getValue()))
.filter(header -> !header.getKey()
.equals(ContextAttributes.datacontenttype.name()))
.map(attribute ->
new SimpleEntry<>(HEADER_PREFIX+attribute.getKey(),
.map(attribute ->
new SimpleEntry<>(HEADER_PREFIX + attribute.getKey(),
attribute.getValue()))
.map(attribute ->
new SimpleEntry<>(attribute.getKey(),
.map(attribute ->
new SimpleEntry<>(attribute.getKey(),
SERIALIZER.serialize(null, attribute.getValue())))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
result.putAll(
extensions.entrySet()
.stream()
.filter(extension -> null!= extension.getValue())
.map(extension ->
new SimpleEntry<>(extension.getKey(),
.filter(extension -> null != extension.getValue())
.map(extension ->
new SimpleEntry<>(HEADER_PREFIX + extension.getKey(),
SERIALIZER.serialize(null, extension.getValue())))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))
);
Optional.ofNullable(attributes
.get(ContextAttributes.datacontenttype.name()))
.ifPresent((dct) -> {
result.put(KAFKA_CONTENT_TYPE, SERIALIZER.serialize(null, dct));
});
.get(ContextAttributes.datacontenttype.name()))
.ifPresent(dct ->
result.put(KAFKA_CONTENT_TYPE, SERIALIZER.serialize(null, dct))
);
return result;
}
}

View File

@ -94,7 +94,7 @@ public class ExtensionMapperTest {
// setup
String expected = "my-extension";
Map<String, Object> headers = new HashMap<>();
headers.put("myexp", "my-extension".getBytes());
headers.put("ce_myexp", "my-extension".getBytes());
//act
String actual = ExtensionMapper.map(headers).get("myexp");
@ -113,9 +113,9 @@ public class ExtensionMapperTest {
myHeaders.put("ce_type", "br.my".getBytes());
myHeaders.put("ce_time", "2019-09-16T20:49:00Z".getBytes());
myHeaders.put("ce_dataschema", "http://my.br".getBytes());
myHeaders.put("my-ext", "myextension".getBytes());
myHeaders.put("traceparent", "0".getBytes());
myHeaders.put("tracestate", "congo=4".getBytes());
myHeaders.put("ce_my-ext", "myextension".getBytes());
myHeaders.put("ce_traceparent", "0".getBytes());
myHeaders.put("ce_tracestate", "congo=4".getBytes());
myHeaders.put("Content-Type", "application/json".getBytes());
// act

View File

@ -125,7 +125,7 @@ public class HeaderMapperTest {
}
@Test
public void should_map_extension_without_prefix() {
public void should_map_extension_with_prefix() {
// setup
Map<String, String> attributes = new HashMap<>();
attributes.put("type", "mytype");
@ -139,7 +139,7 @@ public class HeaderMapperTest {
Map<String, byte[]> actual = HeaderMapper.map(attributes, extensions);
//assert
assertTrue(actual.containsKey("comexampleextension1"));
assertTrue(actual.containsKey("ce_comexampleextension1"));
}
@Test