Created kafka example (#194)

* Created kafka example

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Applied suggestions

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2020-07-02 09:25:43 +02:00 committed by GitHub
parent f42020333a
commit 862e4a657a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 202 additions and 0 deletions

27
examples/kafka/README.md Normal file
View File

@ -0,0 +1,27 @@
# Kafka + CloudEvents sample
## Build
```shell
mvn package
```
## Prepare a Kafka instance
You can use docker to start a sample kafka instance:
```shell
docker run --rm --net=host -e ADV_HOST=localhost -e SAMPLEDATA=0 lensesio/fast-data-dev
```
## Produce events
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.kafka.SampleProducer" -Dexec.args="localhost:9092 sample-topic"
```
## Consume events
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.kafka.SampleConsumer" -Dexec.args="localhost:9092 sample-topic"
```

27
examples/kafka/pom.xml Normal file
View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-kafka-example</artifactId>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,51 @@
package io.cloudevents.examples.kafka;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SampleConsumer {
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Usage: sample_consumer <bootstrap_server> <topic>");
return;
}
// Basic consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-cloudevents-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// Create the consumer and subscribe to the topic
KafkaConsumer<String, CloudEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(args[1]));
System.out.println("Consumer started");
while (true) {
ConsumerRecords<String, CloudEvent> consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> {
System.out.println(
"New record:\n" +
" Record Key " + record.key() + "\n" +
" Record value " + record.value() + "\n" +
" Record partition " + record.partition() + "\n" +
" Record offset " + record.offset() + "\n"
);
});
}
}
}

View File

@ -0,0 +1,75 @@
package io.cloudevents.examples.kafka;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.kafka.CloudEventSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.net.URI;
import java.util.Properties;
import java.util.UUID;
public class SampleProducer {
public static final int MESSAGE_COUNT = 100;
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Usage: sample_producer <bootstrap_server> <topic>");
return;
}
// Basic producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-cloudevents-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Configure the CloudEventSerializer to emit events as json structured events
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.BINARY);
props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);
// Create the KafkaProducer
KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(props);
String topic = args[1];
// Create an event template to set basic CloudEvent attributes
CloudEventBuilder eventTemplate = CloudEventBuilder.v1()
.withSource(URI.create("https://github.com/cloudevents/sdk-java/tree/master/examples/kafka"))
.withType("producer.example");
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
String id = UUID.randomUUID().toString();
String data = "Event number " + i;
// Create the event starting from the template
CloudEvent event = eventTemplate.newBuilder()
.withId(id)
.withData("text/plain", data.getBytes())
.build();
// Send the record
RecordMetadata metadata = producer
.send(new ProducerRecord<>(topic, id, event))
.get();
System.out.println("Record sent to partition " + metadata.partition() + " with offset " + metadata.offset());
} catch (Exception e) {
System.out.println("Error while trying to send the record");
e.printStackTrace();
return;
}
}
// Flush and close the producer
producer.flush();
producer.close();
}
}

21
examples/pom.xml Normal file
View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-examples</artifactId>
<name>Cloudevents - Examples</name>
<packaging>pom</packaging>
<modules>
<module>kafka</module>
</modules>
</project>

View File

@ -113,6 +113,7 @@
</activation>
<modules>
<module>http/restful-ws-integration-tests</module>
<module>examples</module>
</modules>
</profile>