Merge pull request #924 from gygabyte/master

Kafka-clients: records(TopicPartition): unit tests + some (potential) improvements
This commit is contained in:
Tyler Benson 2019-07-19 09:22:09 -07:00 committed by GitHub
commit ace9b53013
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 17 deletions

View File

@ -7,6 +7,7 @@ public class TracingIterable implements Iterable<ConsumerRecord> {
private final Iterable<ConsumerRecord> delegate;
private final String operationName;
private final KafkaDecorator decorator;
private boolean firstIterator = true;
public TracingIterable(
final Iterable<ConsumerRecord> delegate,
@ -19,6 +20,17 @@ public class TracingIterable implements Iterable<ConsumerRecord> {
@Override
public Iterator<ConsumerRecord> iterator() {
return new TracingIterator(delegate.iterator(), operationName, decorator);
Iterator<ConsumerRecord> it;
// We should only return one iterator with tracing.
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// ConsumerRecords is performed in the same thread that called poll()
if (this.firstIterator) {
it = new TracingIterator(delegate.iterator(), operationName, decorator);
firstIterator = false;
} else {
it = delegate.iterator();
}
return it;
}
}

View File

@ -1,23 +1,19 @@
package datadog.trace.instrumentation.kafka_clients;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class TracingList implements List<ConsumerRecord> {
public class TracingList extends TracingIterable implements List<ConsumerRecord> {
private final List<ConsumerRecord> delegate;
private final String operationName;
private final KafkaDecorator decorator;
public TracingList(
final List<ConsumerRecord> delegate,
final String operationName,
final KafkaDecorator decorator) {
super(delegate, operationName, decorator);
this.delegate = delegate;
this.operationName = operationName;
this.decorator = decorator;
}
@Override
@ -35,11 +31,6 @@ public class TracingList implements List<ConsumerRecord> {
return delegate.contains(o);
}
@Override
public Iterator<ConsumerRecord> iterator() {
return new TracingIterator(delegate.iterator(), operationName, decorator);
}
@Override
public Object[] toArray() {
return delegate.toArray();
@ -137,6 +128,10 @@ public class TracingList implements List<ConsumerRecord> {
@Override
public List<ConsumerRecord> subList(final int fromIndex, final int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex), operationName, decorator);
// TODO: the API for subList is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
// Kafka is essentially a sequential commit log. We should only enable tracing when traversing
// sequentially with an iterator
return delegate.subList(fromIndex, toIndex);
}
}

View File

@ -1,6 +1,11 @@
import datadog.trace.agent.test.AgentTestRunner
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.ClassRule
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
@ -9,7 +14,6 @@ import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Shared
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
@ -17,8 +21,7 @@ import java.util.concurrent.TimeUnit
class KafkaClientTest extends AgentTestRunner {
static final SHARED_TOPIC = "shared.topic"
@Shared
@ClassRule
@Rule
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)
def "test kafka produce and consume"() {
@ -121,4 +124,83 @@ class KafkaClientTest extends AgentTestRunner {
container?.stop()
}
def "test records(TopicPartition) kafka consume"() {
setup:
// set up the Kafka consumer properties
def kafkaPartition = 0
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def consumer = new KafkaConsumer<String,String>(consumerProperties)
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producer = new KafkaProducer(senderProps)
consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
when:
def greeting = "Hello from MockConsumer!"
producer.send(new ProducerRecord<Integer, String>(SHARED_TOPIC, kafkaPartition, null, greeting))
then:
TEST_WRITER.waitForTraces(1)
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
def pollResult = KafkaTestUtils.getRecords(consumer)
def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator()
def first = null
if (recs.hasNext()) {
first = recs.next()
}
then:
recs.hasNext() == false
first.value() == greeting
first.key() == null
assertTraces(2) {
trace(0, 1) {
// PRODUCER span 0
span(0) {
serviceName "kafka"
operationName "kafka.produce"
resourceName "Produce Topic $SHARED_TOPIC"
spanType "queue"
errored false
parent()
tags {
"component" "java-kafka"
"span.kind" "producer"
"kafka.partition" { it >= 0 }
defaultTags(true)
}
}
}
trace(1, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $SHARED_TOPIC"
spanType "queue"
errored false
childOf TEST_WRITER[0][0]
tags {
"component" "java-kafka"
"span.kind" "consumer"
"partition" { it >= 0 }
"offset" 0
defaultTags(true)
}
}
}
}
cleanup:
consumer.close()
producer.close()
}
}