---
title: CloudEvents Kafka
nav_order: 5
---
# CloudEvents Kafka
[](http://www.javadoc.io/doc/io.cloudevents/cloudevents-kafka)
Implementation of Kafka Protocol Binding to send and receive CloudEvents.
For Maven based projects, use the following to configure the
[Kafka Protocol Binding](https://github.com/cloudevents/spec/blob/main/kafka-protocol-binding.md):
```xml
io.cloudevents
cloudevents-kafka
3.0.0
```
### Producing CloudEvents
To produce CloudEvents in Kafka, configure the KafkaProducer to use the provided
`CloudEventSerializer`:
```java
import java.util.Properties;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.kafka.CloudEventSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
public class CloudEventProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Other config props
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
try (KafkaProducer producer = new KafkaProducer<>(props)) {
// Build an event
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
.withType("example.kafka")
.withSource(URI.create("http://localhost"))
.build();
// Produce the event
producer.send(new ProducerRecord<>("your.topic", event));
}
}
}
```
You can configure the Encoding and EventFormat to use to emit the event.
Check out the
[`CloudEventSerializer`](https://github.com/cloudevents/sdk-java/tree/main/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java)
javadoc for more info.
### Partition key extension
If you want your producer to use the `partitionkey` extension, you can use the
PartitionKeyExtensionInterceptor.
```java
producerProps.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
io.cloudevents.kafka.PartitionKeyExtensionInterceptor.class
);
```
When using in your producer, this interceptor will pick the `partitionkey`
extension from the event and will set it as record key, regardless of the input record key.
Check out the [`PartitionKeyExtensionInterceptor`](https://github.com/cloudevents/sdk-java/tree/main/kafka/src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java)
javadoc for more info.
## Consuming CloudEvents
To consume CloudEvents in Kafka, configure the KafkaConsumer to use the provided
`CloudEventDeserializer`:
```java
import java.time.Duration;
import java.util.Properties;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class CloudEventConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// Other config props
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class);
try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
records.forEach(rec -> {
System.out.println(rec.value().toString());
});
}
}
}
```