sdk-java/kafka
Francesco Guardiani 34408236db
Moved CloudEventUtils from impl to io.cloudevents.core (#261)
Renamed CloudEventUtils#toVisitable to CloudEventUtils#toReader
Added CloudEventUtils#toEvent

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
2020-11-13 10:00:49 +01:00
..
src Moved CloudEventUtils from impl to io.cloudevents.core (#261) 2020-11-13 10:00:49 +01:00
README.md PartitionKey Kafka Interceptor (#260) 2020-11-13 09:53:55 +01:00
pom.xml Bump to 2.0.0-SNAPSHOT (#255) 2020-11-09 14:13:56 +01:00

README.md

Kafka Protocol Binding

Javadocs

For Maven based projects, use the following to configure the Kafka Protocol Binding:

<dependency>
    <groupId>io.cloudevents</groupId>
    <artifactId>cloudevents-kafka</artifactId>
    <version>2.0.0-milestone4</version>
</dependency>

Producing CloudEvents

To produce CloudEvents in Kafka, configure the KafkaProducer to use the provided CloudEventSerializer:

import java.util.Properties;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.kafka.CloudEventSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

public class CloudEventProducer {

    public static void main(String[] args) {
        Properties props = new Properties();

        // Other config props

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);

        try(KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(props)){

            // Build an event
            CloudEvent event = CloudEventBuilder.v1()
              .withId("hello")
              .withType("example.kafka")
              .withSource(URI.create("http://localhost"))
              .build();

        	// Produce the event
        	producer.send(new ProducerRecord<>("your.topic", event));
        }
    }

}

You can configure the Encoding and EventFormat to use to emit the event. Check out the CloudEventSerializer javadoc for more info.

Partition key extension

If you want your producer to use the partitionkey extension, you can use the PartitionKeyExtensionInterceptor.

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, io.cloudevents.kafka.PartitionKeyExtensionInterceptor.class);

When using in your producer, this interceptor will pick the partitionkey extension from the event and will set it as record key. Check out the PartitionKeyExtensionInterceptor javadoc for more info.

Consuming CloudEvents

To consume CloudEvents in Kafka, configure the KafkaConsumer to use the provided CloudEventDeserializer:

import java.time.Duration;import java.util.Properties;

import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class CloudEventConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();

        // Other config props

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class);

        try(KafkaConsumer<String, CloudEvent> consumer = new KafkaConsumer<>(props)){

            ConsumerRecords<String, CloudEvent> records = consumer.poll(Duration.ofSeconds(10));

            records.forEach(rec -> {
                System.out.println(rec.value().toString());
            });
        }
    }

}