PartitionKey Kafka Interceptor (#260)
* PartitionKey Kafka Interceptor Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Bound Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
parent
c7baada605
commit
db745fd309
|
@ -57,6 +57,18 @@ You can configure the Encoding and EventFormat to use to emit the event.
|
||||||
Check out the [`CloudEventSerializer`](src/main/java/io/cloudevents/kafka/CloudEventSerializer.java)
|
Check out the [`CloudEventSerializer`](src/main/java/io/cloudevents/kafka/CloudEventSerializer.java)
|
||||||
javadoc for more info.
|
javadoc for more info.
|
||||||
|
|
||||||
|
### Partition key extension
|
||||||
|
|
||||||
|
If you want your producer to use the `partitionkey` extension, you can use the [`PartitionKeyExtensionInterceptor`](src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java).
|
||||||
|
|
||||||
|
```java
|
||||||
|
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, io.cloudevents.kafka.PartitionKeyExtensionInterceptor.class);
|
||||||
|
```
|
||||||
|
|
||||||
|
When using in your producer, this interceptor will pick the `partitionkey` extension from the event and will set it as record key.
|
||||||
|
Check out the [`PartitionKeyExtensionInterceptor`](src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java)
|
||||||
|
javadoc for more info.
|
||||||
|
|
||||||
## Consuming CloudEvents
|
## Consuming CloudEvents
|
||||||
|
|
||||||
To consume CloudEvents in Kafka, configure the KafkaConsumer to use the provided [`CloudEventDeserializer`](src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java):
|
To consume CloudEvents in Kafka, configure the KafkaConsumer to use the provided [`CloudEventDeserializer`](src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java):
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package io.cloudevents.kafka;
|
package io.cloudevents.kafka;
|
||||||
|
|
||||||
import io.cloudevents.CloudEvent;
|
import io.cloudevents.CloudEvent;
|
||||||
|
import io.cloudevents.CloudEventData;
|
||||||
import io.cloudevents.core.message.MessageReader;
|
import io.cloudevents.core.message.MessageReader;
|
||||||
import io.cloudevents.rw.CloudEventDataMapper;
|
import io.cloudevents.rw.CloudEventDataMapper;
|
||||||
import org.apache.kafka.common.header.Headers;
|
import org.apache.kafka.common.header.Headers;
|
||||||
|
@ -35,13 +36,13 @@ public class CloudEventDeserializer implements Deserializer<CloudEvent> {
|
||||||
|
|
||||||
public final static String MAPPER_CONFIG = "cloudevents.datamapper";
|
public final static String MAPPER_CONFIG = "cloudevents.datamapper";
|
||||||
|
|
||||||
private CloudEventDataMapper mapper = null;
|
private CloudEventDataMapper<? extends CloudEventData> mapper = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||||
Object mapperConfig = configs.get(MAPPER_CONFIG);
|
Object mapperConfig = configs.get(MAPPER_CONFIG);
|
||||||
if (mapperConfig instanceof CloudEventDataMapper) {
|
if (mapperConfig instanceof CloudEventDataMapper) {
|
||||||
this.mapper = (CloudEventDataMapper) mapperConfig;
|
this.mapper = (CloudEventDataMapper<? extends CloudEventData>) mapperConfig;
|
||||||
} else if (mapperConfig != null) {
|
} else if (mapperConfig != null) {
|
||||||
throw new IllegalArgumentException(MAPPER_CONFIG + " can be of type String or " + CloudEventDataMapper.class.getCanonicalName());
|
throw new IllegalArgumentException(MAPPER_CONFIG + " can be of type String or " + CloudEventDataMapper.class.getCanonicalName());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
package io.cloudevents.kafka;
|
||||||
|
|
||||||
|
import io.cloudevents.CloudEvent;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This {@link ProducerInterceptor} implements the partitioning extension,
|
||||||
|
* as described in the <a href="https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping">CloudEvents Kafka specification</a>.
|
||||||
|
* <p>
|
||||||
|
* When using in your producer, it will pick the {@code partitionkey} extension from the event and will set it as record key.
|
||||||
|
* If the extension is missing, It won't replace the key from the original record.
|
||||||
|
*/
|
||||||
|
public class PartitionKeyExtensionInterceptor implements ProducerInterceptor<Object, CloudEvent> {
|
||||||
|
|
||||||
|
public static final String PARTITION_KEY_EXTENSION = "partitionkey";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProducerRecord<Object, CloudEvent> onSend(ProducerRecord<Object, CloudEvent> record) {
|
||||||
|
if (record.value() == null) {
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
Object partitionKey = record.value().getExtension(PARTITION_KEY_EXTENSION);
|
||||||
|
if (partitionKey == null) {
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), partitionKey, record.value(), record.headers());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> configs) {
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package io.cloudevents.kafka;
|
||||||
|
|
||||||
|
import io.cloudevents.CloudEvent;
|
||||||
|
import io.cloudevents.core.builder.CloudEventBuilder;
|
||||||
|
import io.cloudevents.core.test.Data;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
class PartitionKeyExtensionInterceptorTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoPartitionKeyAndNoOriginalKey() {
|
||||||
|
assertKey(
|
||||||
|
new ProducerRecord<>("aaa", Data.V1_MIN),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoPartitionKey() {
|
||||||
|
assertKey(
|
||||||
|
new ProducerRecord<>("aaa", "blabla", Data.V1_MIN),
|
||||||
|
"blabla"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionKeyAndNoOriginalKey() {
|
||||||
|
assertKey(
|
||||||
|
new ProducerRecord<>("aaa", CloudEventBuilder
|
||||||
|
.v1(Data.V1_MIN)
|
||||||
|
.withExtension(PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION, "albalb")
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
"albalb"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionKey() {
|
||||||
|
assertKey(
|
||||||
|
new ProducerRecord<>("aaa", "blabla", CloudEventBuilder
|
||||||
|
.v1(Data.V1_MIN)
|
||||||
|
.withExtension(PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION, "albalb")
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
"albalb"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertKey(ProducerRecord<Object, CloudEvent> record, Object expectedKey) {
|
||||||
|
PartitionKeyExtensionInterceptor interceptor = new PartitionKeyExtensionInterceptor();
|
||||||
|
assertThat(interceptor.onSend(record).key())
|
||||||
|
.isEqualTo(expectedKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue