diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java index 218eae8599..75409ce946 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/TracingIterator.java @@ -22,6 +22,7 @@ import static io.opentelemetry.auto.instrumentation.kafkaclients.TextMapExtractA import static io.opentelemetry.trace.Span.Kind.CONSUMER; import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; +import io.opentelemetry.auto.config.Config; import io.opentelemetry.auto.instrumentation.api.SpanWithScope; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.SpanContext; @@ -75,12 +76,14 @@ public class TracingIterator implements Iterator { if (consumer) { spanBuilder.setSpanKind(CONSUMER); } - final SpanContext spanContext = extract(next.headers(), GETTER); - if (spanContext.isValid()) { - if (consumer) { - spanBuilder.setParent(spanContext); - } else { - spanBuilder.addLink(spanContext); + if (Config.get().isKafkaClientPropagationEnabled()) { + final SpanContext spanContext = extract(next.headers(), GETTER); + if (spanContext.isValid()) { + if (consumer) { + spanBuilder.setParent(spanContext); + } else { + spanBuilder.addLink(spanContext); + } } } final long startTimeMillis = System.currentTimeMillis(); diff --git a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index 140f2ea902..e645b4ba1a 100644 --- a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -16,6 +16,7 @@ import io.opentelemetry.auto.config.Config import io.opentelemetry.auto.test.AgentTestRunner +import io.opentelemetry.auto.test.utils.ConfigUtils import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer @@ -333,6 +334,154 @@ class KafkaClientTest extends AgentTestRunner { } + def "should not read remote context when consuming messages if propagation is disabled"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + when: "send message" + String message = "Testing without headers" + withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, "true") { + kafkaTemplate.send(SHARED_TOPIC, message) + } + + then: "producer span is created" + assertTraces(1) { + trace(0, 1) { + span(0) { + operationName SHARED_TOPIC + spanKind PRODUCER + errored false + parent() + attributes { + } + } + } + } + + when: "read message with context propagation" + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + KafkaMessageListenerContainer container = startConsumer("consumer-with-propagation", records) + + then: "consumer span is created and is a child of the producer" + // check that the message was received + records.poll(5, TimeUnit.SECONDS) != null + + assertTraces(1) { + trace(0, 2) { + span(0) { + operationName SHARED_TOPIC + spanKind PRODUCER + errored false + parent() + attributes { + } + } + span(1) { + operationName SHARED_TOPIC + spanKind CONSUMER + errored false + childOf span(0) + attributes { + "partition" { it >= 0 } + "offset" 0 + "record.queue_time_ms" { it >= 0 } + } + } + } + } + container.stop() + + when: "read message without context propagation" + ConfigUtils.updateConfig { + System.setProperty("ota."+Config.KAFKA_CLIENT_PROPAGATION_ENABLED, "false") + } + records.clear() + container = startConsumer("consumer-without-propagation", records) + + then: "independent consumer span is created" + // check that the message was received + records.poll(5, TimeUnit.SECONDS) != null + + assertTraces(2) { + trace(0, 2) { + span(0) { + operationName SHARED_TOPIC + spanKind PRODUCER + errored false + parent() + attributes { + } + } + span(1) { + operationName SHARED_TOPIC + spanKind CONSUMER + errored false + childOf span(0) + attributes { + "partition" { it >= 0 } + "offset" 0 + "record.queue_time_ms" { it >= 0 } + } + } + } + trace(1, 1) { + span(0) { + operationName SHARED_TOPIC + spanKind CONSUMER + errored false + parent() + attributes { + "partition" { it >= 0 } + "offset" 0 + "record.queue_time_ms" { it >= 0 } + } + } + } + + } + + cleanup: + producerFactory.stop() + container?.stop() + ConfigUtils.updateConfig { + System.clearProperty("ota."+Config.KAFKA_CLIENT_PROPAGATION_ENABLED) + } + + } + + protected KafkaMessageListenerContainer startConsumer(String groupId, records) { +// set up the Kafka consumer properties + Map consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + container + } + def containerProperties() { try {