diff --git a/instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts index 7a8e8bfd1d..08c6ca4bce 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts @@ -18,7 +18,6 @@ dependencies { testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE") testImplementation("javax.xml.bind:jaxb-api:2.2.3") testLibrary("org.assertj:assertj-core") - testImplementation("org.mockito:mockito-core") // Include latest version of kafka itself along with latest version of client libs. // This seems to help with jar compatibility hell. diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy index c52a5832bb..10c87783f9 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy +++ b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy @@ -28,15 +28,33 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { @Rule KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) + abstract containerProperties() + + Map senderProps() { + return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + } + + Map consumerProps(String group, String autoCommit) { + return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka) + } + + void waitForAssignment(Object container) { + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + } + + def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { + producerFactory.stop() + } + @Unroll def "test kafka client header propagation manual config"() { setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() def producerFactory = new DefaultKafkaProducerFactory(senderProps) def kafkaTemplate = new KafkaTemplate(producerFactory) // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def consumerProperties = consumerProps("sender", "false") // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -62,7 +80,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { container.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(container) when: String message = "Testing without headers" @@ -75,7 +93,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { received.headers().iterator().hasNext() == propagationEnabled cleanup: - producerFactory.stop() + stopProducerFactory(producerFactory) container?.stop() } } diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy index b48a4b1db3..d923ce8e95 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy +++ b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy @@ -16,14 +16,12 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.kafka.test.utils.KafkaTestUtils class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { def "should not read remote context when consuming messages if propagation is disabled"() { setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() def producerFactory = new DefaultKafkaProducerFactory(senderProps) def kafkaTemplate = new KafkaTemplate(producerFactory) @@ -90,13 +88,13 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { } cleanup: - producerFactory.stop() + stopProducerFactory(producerFactory) container?.stop() } protected KafkaMessageListenerContainer startConsumer(String groupId, records) { // set up the Kafka consumer properties - Map consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka) + Map consumerProperties = consumerProps(groupId, "false") consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // create a Kafka consumer factory @@ -120,11 +118,11 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { container.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(container) container } - + @Override def containerProperties() { try { // Different class names for test and latestDepTest. diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy index d6daff31bd..54b97804bd 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy +++ b/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy @@ -23,18 +23,17 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { def "test kafka produce and consume"() { setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() Producer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def consumerProperties = consumerProps("sender", "false") // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -61,7 +60,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { container.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(container) when: String greeting = "Hello Spring Kafka Sender!" @@ -128,12 +127,12 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { def "test spring kafka template produce and consume"() { setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() def producerFactory = new DefaultKafkaProducerFactory(senderProps) def kafkaTemplate = new KafkaTemplate(producerFactory) // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def consumerProperties = consumerProps("sender", "false") // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -159,7 +158,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { container.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(container) when: String greeting = "Hello Spring Kafka Sender!" @@ -218,18 +217,18 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } cleanup: - producerFactory.stop() + stopProducerFactory(producerFactory) container?.stop() } def "test pass through tombstone"() { setup: - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() def producerFactory = new DefaultKafkaProducerFactory(senderProps) def kafkaTemplate = new KafkaTemplate(producerFactory) // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def consumerProperties = consumerProps("sender", "false") // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -255,7 +254,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { container.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(container) when: kafkaTemplate.send(SHARED_TOPIC, null) @@ -301,7 +300,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } cleanup: - producerFactory.stop() + stopProducerFactory(producerFactory) container?.stop() } @@ -310,11 +309,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { // set up the Kafka consumer properties def kafkaPartition = 0 - def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def consumerProperties = consumerProps("sender", "false") consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") def consumer = new KafkaConsumer(consumerProperties) - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() def producer = new KafkaProducer(senderProps) consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) @@ -378,7 +377,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { protected KafkaMessageListenerContainer startConsumer(String groupId, records) { // set up the Kafka consumer properties - Map consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka) + Map consumerProperties = consumerProps(groupId, "false") consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // create a Kafka consumer factory @@ -402,11 +401,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { container.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(container) container } - + @Override def containerProperties() { try { // Different class names for test and latestDepTest. diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/build.gradle.kts b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/build.gradle.kts new file mode 100644 index 0000000000..c99a4f6093 --- /dev/null +++ b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/build.gradle.kts @@ -0,0 +1,40 @@ +plugins { + id("otel.javaagent-testing") +} + +dependencies { + library("org.apache.kafka:kafka-clients:2.4.0") + + testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent")) + + testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE") + testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE") + testLibrary("org.springframework:spring-core:5.2.9.RELEASE") + testImplementation("javax.xml.bind:jaxb-api:2.2.3") + + latestDepTestLibrary("org.apache.kafka:kafka_2.13:+") +} + +tasks { + withType().configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + } + + val testPropagationDisabled by registering(Test::class) { + filter { + includeTestsMatching("KafkaClientPropagationDisabledTest") + isFailOnNoMatchingTests = false + } + include("**/KafkaClientPropagationDisabledTest.*") + jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false") + } + + named("test") { + dependsOn(testPropagationDisabled) + filter { + excludeTestsMatching("KafkaClientPropagationDisabledTest") + isFailOnNoMatchingTests = false + } + } +} diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy new file mode 100644 index 0000000000..3e7099ea3f --- /dev/null +++ b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy @@ -0,0 +1,99 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll + +abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { + + protected static final SHARED_TOPIC = "shared.topic" + + private static final boolean propagationEnabled = Boolean.parseBoolean( + System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true")) + + @Rule + EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, SHARED_TOPIC) + + abstract containerProperties() + + Map senderProps() { + return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString()) + } + + Map consumerProps(String group, String autoCommit) { + return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka()) + } + + void waitForAssignment(Object container) { + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()) + } + + def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { + producerFactory.destroy() + } + + @Unroll + def "test kafka client header propagation manual config"() { + setup: + def senderProps = senderProps() + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = consumerProps("sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(container) + + when: + String message = "Testing without headers" + kafkaTemplate.send(SHARED_TOPIC, message) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == propagationEnabled + + cleanup: + stopProducerFactory(producerFactory) + container?.stop() + } +} diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy new file mode 100644 index 0000000000..d923ce8e95 --- /dev/null +++ b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy @@ -0,0 +1,134 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener + +class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { + + def "should not read remote context when consuming messages if propagation is disabled"() { + setup: + def senderProps = senderProps() + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + when: "send message" + String message = "Testing without headers" + kafkaTemplate.send(SHARED_TOPIC, message) + + then: "producer span is created" + assertTraces(1) { + trace(0, 1) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + } + } + + when: "read message without context propagation" + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + KafkaMessageListenerContainer container = startConsumer("consumer-without-propagation", records) + + then: "independent consumer span is created" + // check that the message was received + records.poll(5, TimeUnit.SECONDS) != null + + assertTraces(2) { + trace(0, 1) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " process" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + } + } + } + + } + + cleanup: + stopProducerFactory(producerFactory) + container?.stop() + } + + protected KafkaMessageListenerContainer startConsumer(String groupId, records) { + // set up the Kafka consumer properties + Map consumerProperties = consumerProps(groupId, "false") + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(container) + container + } + + @Override + def containerProperties() { + try { + // Different class names for test and latestDepTest. + return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + } +} diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy new file mode 100644 index 0000000000..54b97804bd --- /dev/null +++ b/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy @@ -0,0 +1,417 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.utils.KafkaTestUtils + +class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { + + def "test kafka produce and consume"() { + setup: + def senderProps = senderProps() + Producer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + + // set up the Kafka consumer properties + def consumerProperties = consumerProps("sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(container) + + when: + String greeting = "Hello Spring Kafka Sender!" + runWithSpan("parent") { + producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + if (ex == null) { + runWithSpan("producer callback") {} + } else { + runWithSpan("producer exception: " + ex) {} + } + } + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + } + span(1) { + name SHARED_TOPIC + " send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + span(2) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + } + } + span(3) { + name "producer callback" + kind SpanKind.INTERNAL + childOf span(0) + } + } + } + + cleanup: + producer.close() + container?.stop() + } + + def "test spring kafka template produce and consume"() { + setup: + def senderProps = senderProps() + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = consumerProps("sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(container) + + when: + String greeting = "Hello Spring Kafka Sender!" + runWithSpan("parent") { + kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({ + runWithSpan("producer callback") {} + }, { ex -> + runWithSpan("producer exception: " + ex) {} + }) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + } + span(1) { + name SHARED_TOPIC + " send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + span(2) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + } + } + span(3) { + name "producer callback" + kind SpanKind.INTERNAL + childOf span(0) + } + } + } + + cleanup: + stopProducerFactory(producerFactory) + container?.stop() + } + + def "test pass through tombstone"() { + setup: + def senderProps = senderProps() + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = consumerProps("sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(container) + + when: + kafkaTemplate.send(SHARED_TOPIC, null) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == null + received.key() == null + + assertTraces(1) { + trace(0, 2) { + // PRODUCER span 0 + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true + } + } + // CONSUMER span 0 + span(1) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + } + } + } + } + + cleanup: + stopProducerFactory(producerFactory) + container?.stop() + } + + def "test records(TopicPartition) kafka consume"() { + setup: + + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = consumerProps("sender", "false") + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = senderProps() + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + waitForTraces(1) + def records = new LinkedBlockingQueue>() + def pollResult = KafkaTestUtils.getRecords(consumer) + + def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() + + def first = null + if (recs.hasNext()) { + first = recs.next() + } + + then: + recs.hasNext() == false + first.value() == greeting + first.key() == null + + assertTraces(1) { + trace(0, 2) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + } + } + span(1) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + } + } + } + } + + cleanup: + consumer.close() + producer.close() + } + + protected KafkaMessageListenerContainer startConsumer(String groupId, records) { + // set up the Kafka consumer properties + Map consumerProperties = consumerProps(groupId, "false") + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = containerProperties() + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(container) + container + } + + @Override + def containerProperties() { + try { + // Different class names for test and latestDepTest. + return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + } +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts index c126f9638c..4af5f61dfb 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts @@ -1,5 +1,6 @@ plugins { id("otel.javaagent-instrumentation") + id("org.unbroken-dome.test-sets") } muzzle { @@ -10,36 +11,42 @@ muzzle { } } +testSets { + create("latestDepTest") +} + dependencies { - library("org.apache.kafka:kafka-streams:0.11.0.0") + compileOnly("org.apache.kafka:kafka-streams:0.11.0.0") // Include kafka-clients instrumentation for tests. testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent")) - testLibrary("org.apache.kafka:kafka-clients:0.11.0.0") - testLibrary("org.springframework.kafka:spring-kafka:1.3.3.RELEASE") - testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE") + testImplementation("org.apache.kafka:kafka-streams:0.11.0.0") + testImplementation("org.apache.kafka:kafka-clients:0.11.0.0") + testImplementation("org.springframework.kafka:spring-kafka:1.3.3.RELEASE") + testImplementation("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE") testImplementation("javax.xml.bind:jaxb-api:2.2.3") - testImplementation("org.mockito:mockito-core") - testLibrary("org.assertj:assertj-core") + testImplementation("org.assertj:assertj-core") - - // Include latest version of kafka itself along with latest version of client libs. - // This seems to help with jar compatibility hell. - latestDepTestLibrary("org.apache.kafka:kafka_2.11:2.3.+") - // (Pinning to 2.3.x: 2.4.0 introduces an error when executing compileLatestDepTestGroovy) - // Caused by: java.lang.NoClassDefFoundError: org.I0Itec.zkclient.ZkClient - latestDepTestLibrary("org.apache.kafka:kafka-clients:2.3.+") - latestDepTestLibrary("org.apache.kafka:kafka-streams:2.3.+") - latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.2.+") - latestDepTestLibrary("org.springframework.kafka:spring-kafka-test:2.2.+") - // assertj-core:3.20.0 is incompatible with spring-kafka-test:2.7.2 - latestDepTestLibrary("org.assertj:assertj-core:3.19.0") + add("latestDepTestImplementation", "org.apache.kafka:kafka_2.13:+") + add("latestDepTestImplementation", "org.apache.kafka:kafka-clients:+") + add("latestDepTestImplementation", "org.apache.kafka:kafka-streams:+") + add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka:+") + add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka-test:+") } -tasks.withType().configureEach { - // TODO run tests both with and without experimental span attributes - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") +tasks { + withType().configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + } + + if (findProperty("testLatestDeps") as Boolean) { + // latestDepTest is still run + named("test") { + enabled = false + } + } } // Requires old version of AssertJ for baseline diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy new file mode 100644 index 0000000000..dba1f88d44 --- /dev/null +++ b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy @@ -0,0 +1,246 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator +import io.opentelemetry.context.Context +import io.opentelemetry.context.propagation.TextMapGetter +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.ValueMapper +import org.junit.ClassRule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared + +class KafkaStreamsTest extends AgentInstrumentationSpecification { + + static final STREAM_PENDING = "test.pending" + static final STREAM_PROCESSED = "test.processed" + + @Shared + @ClassRule + EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, STREAM_PENDING, STREAM_PROCESSED) + + Map senderProps() { + return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString()) + } + + Map consumerProps(String group, String autoCommit) { + return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka()) + } + + void waitForAssignment(Object container) { + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()) + } + + def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { + producerFactory.destroy() + } + + def "test kafka produce and consume with streams in-between"() { + setup: + def config = new Properties() + def senderProps = senderProps() + config.putAll(senderProps) + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + + // CONFIGURE CONSUMER + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProps("sender", "false")) + + def containerProperties + try { + // Different class names for test and latestDepTest. + containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED) + } + def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the processed message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + consumerContainer.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + Span.current().setAttribute("testing", 123) + records.add(record) + } + }) + + // start the container and underlying message listener + consumerContainer.start() + + // wait until the container has the required number of assigned partitions + waitForAssignment(consumerContainer) + + // CONFIGURE PROCESSOR + def builder + try { + // Different class names for test and latestDepTest. + builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance() + } catch (ClassNotFoundException | NoClassDefFoundError e) { + builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance() + } + KStream textLines = builder.stream(STREAM_PENDING) + def values = textLines + .mapValues(new ValueMapper() { + @Override + String apply(String textLine) { + Span.current().setAttribute("asdf", "testing") + return textLine.toLowerCase() + } + }) + + KafkaStreams streams + try { + // Different api for test and latestDepTest. + values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED) + streams = new KafkaStreams(builder, config) + } catch (MissingMethodException e) { + def producer = Class.forName("org.apache.kafka.streams.kstream.Produced") + .with(Serdes.String(), Serdes.String()) + values.to(STREAM_PROCESSED, producer) + streams = new KafkaStreams(builder.build(), config) + } + streams.start() + + // CONFIGURE PRODUCER + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + when: + String greeting = "TESTING TESTING 123!" + kafkaTemplate.send(STREAM_PENDING, greeting) + + then: + // check that the message was received + def received = records.poll(10, TimeUnit.SECONDS) + received.value() == greeting.toLowerCase() + received.key() == null + + assertTraces(1) { + trace(0, 5) { + // PRODUCER span 0 + span(0) { + name STREAM_PENDING + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + // CONSUMER span 0 + span(1) { + name STREAM_PENDING + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + } + } + // STREAMING span 1 + span(2) { + name STREAM_PENDING + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "asdf" "testing" + } + } + // STREAMING span 0 + span(3) { + name STREAM_PROCESSED + " send" + kind PRODUCER + childOf span(2) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + // CONSUMER span 0 + span(4) { + name STREAM_PROCESSED + " process" + kind CONSUMER + childOf span(3) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + "testing" 123 + } + } + } + } + + def headers = received.headers() + headers.iterator().hasNext() + def traceparent = new String(headers.headers("traceparent").iterator().next().value()) + Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter() { + @Override + Iterable keys(String carrier) { + return Collections.singleton("traceparent") + } + + @Override + String get(String carrier, String key) { + if (key == "traceparent") { + return traceparent + } + return null + } + }) + def spanContext = Span.fromContext(context).getSpanContext() + def streamSendSpan = traces[0][3] + spanContext.traceId == streamSendSpan.traceId + spanContext.spanId == streamSendSpan.spanId + + + cleanup: + stopProducerFactory(producerFactory) + streams?.close() + consumerContainer?.stop() + } +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy index ce2cd59931..670b3c210f 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy @@ -40,17 +40,33 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { @ClassRule KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STREAM_PENDING, STREAM_PROCESSED) + Map senderProps() { + return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + } + + Map consumerProps(String group, String autoCommit) { + return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka) + } + + void waitForAssignment(Object container) { + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + } + + def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { + producerFactory.stop() + } + def "test kafka produce and consume with streams in-between"() { setup: def config = new Properties() - def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def senderProps = senderProps() config.putAll(senderProps) config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) // CONFIGURE CONSUMER - def consumerFactory = new DefaultKafkaConsumerFactory(KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)) + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProps("sender", "false")) def containerProperties try { @@ -77,7 +93,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { consumerContainer.start() // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(consumerContainer, embeddedKafka.getPartitionsPerTopic()) + waitForAssignment(consumerContainer) // CONFIGURE PROCESSOR def builder @@ -223,7 +239,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { cleanup: - producerFactory?.stop() + stopProducerFactory(producerFactory) streams?.close() consumerContainer?.stop() } diff --git a/settings.gradle.kts b/settings.gradle.kts index d9a1933610..61042d10ec 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -208,6 +208,7 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent") include(":instrumentation:jsf:myfaces-1.2:javaagent") include(":instrumentation:jsp-2.3:javaagent") include(":instrumentation:kafka-clients-0.11:javaagent") +include(":instrumentation:kafka-clients-0.11:kafka-clients-2.4.0-testing") include(":instrumentation:kafka-streams-0.11:javaagent") include(":instrumentation:kotlinx-coroutines:javaagent") include(":instrumentation:kubernetes-client-7.0:javaagent")