diff --git a/instrumentation/external-annotations/javaagent/src/test/groovy/ConfiguredTraceAnnotationsTest.groovy b/instrumentation/external-annotations/javaagent/src/test/groovy/ConfiguredTraceAnnotationsTest.groovy index 516c37fea7..f85ba25dbd 100644 --- a/instrumentation/external-annotations/javaagent/src/test/groovy/ConfiguredTraceAnnotationsTest.groovy +++ b/instrumentation/external-annotations/javaagent/src/test/groovy/ConfiguredTraceAnnotationsTest.groovy @@ -14,7 +14,7 @@ class ConfiguredTraceAnnotationsTest extends AgentTestRunner { "package.Class\$Name;${OuterClass.InterestingMethod.name}") } - def specCleanup() { + def cleanupSpec() { ConfigUtils.setConfig(PREVIOUS_CONFIG) } diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy new file mode 100644 index 0000000000..1e46872d70 --- /dev/null +++ b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentTestRunner +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll + +abstract class KafkaClientBaseTest extends AgentTestRunner { + + protected static final SHARED_TOPIC = "shared.topic" + + protected isPropagationEnabled() { + return true + } + + @Rule + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) + + @Unroll + def "test kafka client header propagation manual config"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String message = "Testing without headers" + kafkaTemplate.send(SHARED_TOPIC, message) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == isPropagationEnabled() + + cleanup: + producerFactory.stop() + container?.stop() + } +} diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy new file mode 100644 index 0000000000..4f860811f6 --- /dev/null +++ b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy @@ -0,0 +1,152 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.Span.Kind.CONSUMER +import static io.opentelemetry.api.trace.Span.Kind.PRODUCER + +import io.opentelemetry.api.trace.attributes.SemanticAttributes +import io.opentelemetry.instrumentation.test.utils.ConfigUtils +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils + +class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { + static final PREVIOUS_CONFIG = ConfigUtils.updateConfigAndResetInstrumentation { + it.setProperty("otel.instrumentation.kafka.client-propagation", "false") + } + + def cleanupSpec() { + ConfigUtils.setConfig(PREVIOUS_CONFIG) + } + + @Override + protected isPropagationEnabled() { + return false + } + + def "should not read remote context when consuming messages if propagation is disabled"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + when: "send message" + String message = "Testing without headers" + kafkaTemplate.send(SHARED_TOPIC, message) + + then: "producer span is created" + assertTraces(1) { + trace(0, 1) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + errored false + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + } + } + + when: "read message without context propagation" + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + KafkaMessageListenerContainer container = startConsumer("consumer-without-propagation", records) + + then: "independent consumer span is created" + // check that the message was received + records.poll(5, TimeUnit.SECONDS) != null + + assertTraces(2) { + trace(0, 1) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + errored false + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " process" + kind CONSUMER + errored false + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "partition" { it >= 0 } + "offset" 0 + "record.queue_time_ms" { it >= 0 } + } + } + } + + } + + cleanup: + producerFactory.stop() + container?.stop() + } + + protected KafkaMessageListenerContainer startConsumer(String groupId, records) { + // set up the Kafka consumer properties + Map consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + container + } + + + def containerProperties() { + try { + // Different class names for test and latestDepTest. + return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + } +} diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientTest.groovy b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy similarity index 66% rename from instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientTest.groovy rename to instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy index 36629e6436..ffb329c6d1 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientTest.groovy +++ b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy @@ -5,14 +5,10 @@ import static io.opentelemetry.api.trace.Span.Kind.CONSUMER import static io.opentelemetry.api.trace.Span.Kind.PRODUCER -import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.setConfig -import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.updateConfig import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace import io.opentelemetry.api.trace.attributes.SemanticAttributes -import io.opentelemetry.instrumentation.api.config.Config -import io.opentelemetry.instrumentation.test.AgentTestRunner import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit import org.apache.kafka.clients.consumer.ConsumerConfig @@ -23,22 +19,15 @@ import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringSerializer -import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Unroll -class KafkaClientTest extends AgentTestRunner { - static final SHARED_TOPIC = "shared.topic" - - @Rule - KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) +class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { def "test kafka produce and consume"() { setup: @@ -380,197 +369,6 @@ class KafkaClientTest extends AgentTestRunner { producer.close() } - @Unroll - def "test kafka client header propagation manual config"() { - setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) - - when: - String message = "Testing without headers" - def previousConfig = setPropagation(propagationEnabled) - kafkaTemplate.send(SHARED_TOPIC, message) - setConfig(previousConfig) - - then: - // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - - received.headers().iterator().hasNext() == propagationEnabled - - cleanup: - producerFactory.stop() - container?.stop() - - where: - propagationEnabled << [false, true] - } - - def "should not read remote context when consuming messages if propagation is disabled"() { - setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - when: "send message" - String message = "Testing without headers" - kafkaTemplate.send(SHARED_TOPIC, message) - - then: "producer span is created" - assertTraces(1) { - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - errored false - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - } - } - - when: "read message with context propagation" - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - KafkaMessageListenerContainer container = startConsumer("consumer-with-propagation", records) - - then: "consumer span is created and is a child of the producer" - // check that the message was received - records.poll(5, TimeUnit.SECONDS) != null - - assertTraces(1) { - trace(0, 2) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - errored false - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - errored false - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "partition" { it >= 0 } - "offset" 0 - "record.queue_time_ms" { it >= 0 } - } - } - } - } - container.stop() - - when: "read message without context propagation" - def previousConfig = setPropagation(false) - records.clear() - container = startConsumer("consumer-without-propagation", records) - - then: "independent consumer span is created" - // check that the message was received - records.poll(5, TimeUnit.SECONDS) != null - - assertTraces(2) { - trace(0, 2) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - errored false - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - errored false - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "partition" { it >= 0 } - "offset" 0 - "record.queue_time_ms" { it >= 0 } - } - } - } - trace(1, 1) { - span(0) { - name SHARED_TOPIC + " process" - kind CONSUMER - errored false - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "partition" { it >= 0 } - "offset" 0 - "record.queue_time_ms" { it >= 0 } - } - } - } - - } - - cleanup: - producerFactory.stop() - container?.stop() - setConfig(previousConfig) - } - protected KafkaMessageListenerContainer startConsumer(String groupId, records) { // set up the Kafka consumer properties Map consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka) @@ -610,10 +408,4 @@ class KafkaClientTest extends AgentTestRunner { return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) } } - - private static Config setPropagation(boolean propagationEnabled) { - return updateConfig { - it.setProperty("otel.instrumentation.kafka.client-propagation", Boolean.toString(propagationEnabled)) - } - } }