diff --git a/examples/kafka/README.md b/examples/kafka/README.md new file mode 100644 index 00000000..8d5144d0 --- /dev/null +++ b/examples/kafka/README.md @@ -0,0 +1,27 @@ +# Kafka + CloudEvents sample + +## Build + +```shell +mvn package +``` + +## Prepare a Kafka instance + +You can use docker to start a sample kafka instance: + +```shell +docker run --rm --net=host -e ADV_HOST=localhost -e SAMPLEDATA=0 lensesio/fast-data-dev +``` + +## Produce events + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.kafka.SampleProducer" -Dexec.args="localhost:9092 sample-topic" +``` + +## Consume events + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.kafka.SampleConsumer" -Dexec.args="localhost:9092 sample-topic" +``` diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml new file mode 100644 index 00000000..ac485b49 --- /dev/null +++ b/examples/kafka/pom.xml @@ -0,0 +1,27 @@ + + + + cloudevents-examples + io.cloudevents + 2.0.0-SNAPSHOT + + 4.0.0 + + cloudevents-kafka-example + + + + io.cloudevents + cloudevents-kafka + ${project.version} + + + io.cloudevents + cloudevents-json-jackson + ${project.version} + + + + diff --git a/examples/kafka/src/main/java/io/cloudevents/examples/kafka/SampleConsumer.java b/examples/kafka/src/main/java/io/cloudevents/examples/kafka/SampleConsumer.java new file mode 100644 index 00000000..213bb133 --- /dev/null +++ b/examples/kafka/src/main/java/io/cloudevents/examples/kafka/SampleConsumer.java @@ -0,0 +1,51 @@ +package io.cloudevents.examples.kafka; + +import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.CloudEventDeserializer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class SampleConsumer { + + public static void main(String[] args) { + if (args.length < 2) { + System.out.println("Usage: sample_consumer "); + return; + } + + // Basic consumer configuration + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-cloudevents-consumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + + // Create the consumer and subscribe to the topic + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(args[1])); + + System.out.println("Consumer started"); + + while (true) { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(record -> { + System.out.println( + "New record:\n" + + " Record Key " + record.key() + "\n" + + " Record value " + record.value() + "\n" + + " Record partition " + record.partition() + "\n" + + " Record offset " + record.offset() + "\n" + ); + }); + } + + } +} diff --git a/examples/kafka/src/main/java/io/cloudevents/examples/kafka/SampleProducer.java b/examples/kafka/src/main/java/io/cloudevents/examples/kafka/SampleProducer.java new file mode 100644 index 00000000..ec1b4cd2 --- /dev/null +++ b/examples/kafka/src/main/java/io/cloudevents/examples/kafka/SampleProducer.java @@ -0,0 +1,75 @@ +package io.cloudevents.examples.kafka; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.message.Encoding; +import io.cloudevents.jackson.JsonFormat; +import io.cloudevents.kafka.CloudEventSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.net.URI; +import java.util.Properties; +import java.util.UUID; + +public class SampleProducer { + + public static final int MESSAGE_COUNT = 100; + + public static void main(String[] args) { + if (args.length < 2) { + System.out.println("Usage: sample_producer "); + return; + } + + // Basic producer configuration + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-cloudevents-producer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + // Configure the CloudEventSerializer to emit events as json structured events + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class); + props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.BINARY); + props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE); + + // Create the KafkaProducer + KafkaProducer producer = new KafkaProducer<>(props); + String topic = args[1]; + + // Create an event template to set basic CloudEvent attributes + CloudEventBuilder eventTemplate = CloudEventBuilder.v1() + .withSource(URI.create("https://github.com/cloudevents/sdk-java/tree/master/examples/kafka")) + .withType("producer.example"); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + try { + String id = UUID.randomUUID().toString(); + String data = "Event number " + i; + + // Create the event starting from the template + CloudEvent event = eventTemplate.newBuilder() + .withId(id) + .withData("text/plain", data.getBytes()) + .build(); + + // Send the record + RecordMetadata metadata = producer + .send(new ProducerRecord<>(topic, id, event)) + .get(); + System.out.println("Record sent to partition " + metadata.partition() + " with offset " + metadata.offset()); + } catch (Exception e) { + System.out.println("Error while trying to send the record"); + e.printStackTrace(); + return; + } + } + + // Flush and close the producer + producer.flush(); + producer.close(); + } +} diff --git a/examples/pom.xml b/examples/pom.xml new file mode 100644 index 00000000..a17879c0 --- /dev/null +++ b/examples/pom.xml @@ -0,0 +1,21 @@ + + + + cloudevents-parent + io.cloudevents + 2.0.0-SNAPSHOT + + 4.0.0 + + cloudevents-examples + Cloudevents - Examples + pom + + + kafka + + + + diff --git a/pom.xml b/pom.xml index db531279..ac7cd4d1 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,7 @@ http/restful-ws-integration-tests + examples