This commit is contained in:
Richard Startin 2020-06-24 16:30:56 +01:00 committed by Trask Stalnaker
parent 4a943c8411
commit 9e28828cb5
4 changed files with 112 additions and 18 deletions

View File

@ -55,7 +55,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
packageName + ".TextMapExtractAdapter",
packageName + ".TracingIterable",
packageName + ".TracingIterator",
packageName + ".TracingList",
packageName + ".TracingList"
};
}

View File

@ -93,6 +93,11 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
callback = new ProducerCallback(callback, span);
boolean isTombstone = record.value() == null && !record.headers().iterator().hasNext();
if (isTombstone) {
span.setAttribute("tombstone", true);
}
// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
@ -101,7 +106,9 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
// headers attempt to read messages that were produced by clients > 0.11 and the magic
// value of the broker(s) is >= 2
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()) {
&& Config.get().isKafkaClientPropagationEnabled()
// Must not interfere with tombstones
&& !isTombstone) {
final Context context = withSpan(span, Context.current());
try {
OpenTelemetry.getPropagators()

View File

@ -86,6 +86,11 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
final long startTimeMillis = System.currentTimeMillis();
spanBuilder.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis));
final Span span = spanBuilder.startSpan();
// tombstone checking logic here because it can only be inferred
// from the record itself
if (next.value() == null && !next.headers().iterator().hasNext()) {
span.setAttribute("tombstone", true);
}
decorator.afterStart(span);
decorator.onConsume(span, startTimeMillis, next);
currentSpanWithScope = new SpanWithScope(span, currentContextWith(span));

View File

@ -59,13 +59,7 @@ class KafkaClientTest extends AgentTestRunner {
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties
try {
// Different class names for test and latestDepTest.
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
@ -127,6 +121,92 @@ class KafkaClientTest extends AgentTestRunner {
container?.stop()
}
def "test pass through tombstone"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
when:
kafkaTemplate.send(SHARED_TOPIC, null)
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.value() == null
received.key() == null
assertTraces(2) {
trace(0, 1) {
// PRODUCER span 0
span(0) {
operationName SHARED_TOPIC
spanKind PRODUCER
errored false
parent()
tags {
"tombstone" true
}
}
}
// when a user consumes a tombstone a new trace is started
// because context can't be propagated safely
trace(1, 1) {
// CONSUMER span 0
span(0) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
parent()
tags {
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
"tombstone" true
}
}
}
}
def headers = received.headers()
!headers.iterator().hasNext()
cleanup:
producerFactory.stop()
container?.stop()
}
def "test records(TopicPartition) kafka consume"() {
setup:
@ -207,13 +287,7 @@ class KafkaClientTest extends AgentTestRunner {
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties
try {
// Different class names for test and latestDepTest.
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
@ -225,7 +299,6 @@ class KafkaClientTest extends AgentTestRunner {
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
}
})
@ -260,5 +333,14 @@ class KafkaClientTest extends AgentTestRunner {
}
def containerProperties() {
try {
// Different class names for test and latestDepTest.
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
}
}