diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index 35c297aa8c..0f7a4c340e 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -55,7 +55,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { packageName + ".TextMapExtractAdapter", packageName + ".TracingIterable", packageName + ".TracingIterator", - packageName + ".TracingList", + packageName + ".TracingList" }; } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java index 60dd5dbfea..0c5d668400 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java @@ -93,6 +93,11 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { callback = new ProducerCallback(callback, span); + boolean isTombstone = record.value() == null && !record.headers().iterator().hasNext(); + if (isTombstone) { + span.setAttribute("tombstone", true); + } + // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 @@ -101,7 +106,9 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { // headers attempt to read messages that were produced by clients > 0.11 and the magic // value of the broker(s) is >= 2 if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 - && Config.get().isKafkaClientPropagationEnabled()) { + && Config.get().isKafkaClientPropagationEnabled() + // Must not interfere with tombstones + && !isTombstone) { final Context context = withSpan(span, Context.current()); try { OpenTelemetry.getPropagators() diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java index dba9ccbab0..218eae8599 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java @@ -86,6 +86,11 @@ public class TracingIterator implements Iterator { final long startTimeMillis = System.currentTimeMillis(); spanBuilder.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis)); final Span span = spanBuilder.startSpan(); + // tombstone checking logic here because it can only be inferred + // from the record itself + if (next.value() == null && !next.headers().iterator().hasNext()) { + span.setAttribute("tombstone", true); + } decorator.afterStart(span); decorator.onConsume(span, startTimeMillis, next); currentSpanWithScope = new SpanWithScope(span, currentContextWith(span)); diff --git a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index 943e08aaa4..d461dd48d6 100644 --- a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -59,13 +59,7 @@ class KafkaClientTest extends AgentTestRunner { def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) // set the topic that needs to be consumed - def containerProperties - try { - // Different class names for test and latestDepTest. - containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) - } + def containerProperties = containerProperties() // create a Kafka MessageListenerContainer def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) @@ -127,6 +121,92 @@ class KafkaClientTest extends AgentTestRunner { container?.stop() } + + def "test pass through tombstone"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + kafkaTemplate.send(SHARED_TOPIC, null) + + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == null + received.key() == null + + assertTraces(2) { + trace(0, 1) { + // PRODUCER span 0 + span(0) { + operationName SHARED_TOPIC + spanKind PRODUCER + errored false + parent() + tags { + "tombstone" true + } + } + } + // when a user consumes a tombstone a new trace is started + // because context can't be propagated safely + trace(1, 1) { + // CONSUMER span 0 + span(0) { + operationName SHARED_TOPIC + spanKind CONSUMER + errored false + parent() + tags { + "partition" { it >= 0 } + "offset" 0 + "record.queue_time_ms" { it >= 0 } + "tombstone" true + } + } + } + } + + def headers = received.headers() + !headers.iterator().hasNext() + + cleanup: + producerFactory.stop() + container?.stop() + } + def "test records(TopicPartition) kafka consume"() { setup: @@ -207,13 +287,7 @@ class KafkaClientTest extends AgentTestRunner { def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) // set the topic that needs to be consumed - def containerProperties - try { - // Different class names for test and latestDepTest. - containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) - } + def containerProperties = containerProperties() // create a Kafka MessageListenerContainer def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) @@ -225,7 +299,6 @@ class KafkaClientTest extends AgentTestRunner { container.setupMessageListener(new MessageListener() { @Override void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces records.add(record) } }) @@ -260,5 +333,14 @@ class KafkaClientTest extends AgentTestRunner { } -} + def containerProperties() { + try { + // Different class names for test and latestDepTest. + return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + } + +}