diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index acf4944cfe..1a8e6f19da 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -11,6 +11,8 @@ muzzle { } } +val versions: Map by project + dependencies { compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") @@ -19,25 +21,13 @@ dependencies { library("org.apache.kafka:kafka-clients:0.11.0.0") - testLibrary("org.springframework.kafka:spring-kafka:1.3.3.RELEASE") - testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE") - testImplementation("javax.xml.bind:jaxb-api:2.2.3") - testLibrary("org.assertj:assertj-core") - - // Include latest version of kafka itself along with latest version of client libs. - // This seems to help with jar compatibility hell. - latestDepTestLibrary("org.apache.kafka:kafka_2.11:2.3.+") - // (Pinning to 2.3.x: 2.4.0 introduces an error when executing compileLatestDepTestGroovy) - // Caused by: java.lang.NoClassDefFoundError: org.I0Itec.zkclient.ZkClient - latestDepTestLibrary("org.apache.kafka:kafka-clients:2.3.+") - latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.2.+") - latestDepTestLibrary("org.springframework.kafka:spring-kafka-test:2.2.+") - // assertj-core:3.20.0 is incompatible with spring-kafka-test:2.7.2 - latestDepTestLibrary("org.assertj:assertj-core:3.19.0") + testImplementation("org.testcontainers:kafka:${versions["org.testcontainers"]}") } tasks { withType().configureEach { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + // TODO run tests both with and without experimental span attributes jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") } @@ -59,8 +49,3 @@ tasks { } } } - -// Requires old version of AssertJ for baseline -if (!(findProperty("testLatestDeps") as Boolean)) { - configurations.testRuntimeClasspath.resolutionStrategy.force("org.assertj:assertj-core:2.9.1") -} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy index 0367b07026..a2c6012922 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy @@ -4,19 +4,23 @@ */ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.junit.Rule -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.rule.KafkaEmbedded -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.kafka.test.utils.KafkaTestUtils +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.IntegerDeserializer +import org.apache.kafka.common.serialization.IntegerSerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.testcontainers.containers.KafkaContainer +import spock.lang.Shared import spock.lang.Unroll -import java.util.concurrent.LinkedBlockingQueue +import java.time.Duration import java.util.concurrent.TimeUnit abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { @@ -26,75 +30,67 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { private static final boolean propagationEnabled = Boolean.parseBoolean( System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true")) - @Rule - KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) + @Shared + static KafkaContainer kafka + @Shared + static Producer producer + @Shared + static Consumer consumer - abstract containerProperties() + def setupSpec() { + kafka = new KafkaContainer() + kafka.start() - Map senderProps() { - return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + // create test topic + AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin -> + admin.createTopics([new NewTopic(SHARED_TOPIC, 1, (short) 1)]).all().get(10, TimeUnit.SECONDS) + } + + // values copied from spring's KafkaTestUtils + def producerProps = [ + "bootstrap.servers": kafka.bootstrapServers, + "retries" : 0, + "batch.size" : "16384", + "linger.ms" : 1, + "buffer.memory" : "33554432", + "key.serializer" : IntegerSerializer, + "value.serializer" : StringSerializer + ] + producer = new KafkaProducer<>(producerProps) + + // values copied from spring's KafkaTestUtils + def consumerProps = [ + "bootstrap.servers" : kafka.bootstrapServers, + "group.id" : "test", + "enable.auto.commit" : "false", + "auto.commit.interval.ms": "10", + "session.timeout.ms" : "60000", + "key.deserializer" : IntegerDeserializer, + "value.deserializer" : StringDeserializer + ] + consumer = new KafkaConsumer<>(consumerProps) + + // assign only existing topic partition + consumer.assign([new TopicPartition(SHARED_TOPIC, 0)]) } - Map consumerProps(String group, String autoCommit) { - return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka) - } - - void waitForAssignment(Object container) { - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) - } - - def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { - producerFactory.stop() + def cleanupSpec() { + consumer?.close() + producer?.close() + kafka.stop() } @Unroll def "test kafka client header propagation manual config"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - when: String message = "Testing without headers" - kafkaTemplate.send(SHARED_TOPIC, message) + producer.send(new ProducerRecord<>(SHARED_TOPIC, message)) then: // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - - received.headers().iterator().hasNext() == propagationEnabled - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() + def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + for (record in records) { + assert record.headers().iterator().hasNext() == propagationEnabled + } } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy index 30f5e258b6..24ee3f71ca 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy @@ -4,16 +4,9 @@ */ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener +import org.apache.kafka.clients.producer.ProducerRecord -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit +import java.time.Duration import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.PRODUCER @@ -21,14 +14,9 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { def "should not read remote context when consuming messages if propagation is disabled"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - when: "send message" String message = "Testing without headers" - kafkaTemplate.send(SHARED_TOPIC, message) + producer.send(new ProducerRecord<>(SHARED_TOPIC, message)) then: "producer span is created" assertTraces(1) { @@ -47,14 +35,12 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { } when: "read message without context propagation" - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - KafkaMessageListenerContainer container = startConsumer("consumer-without-propagation", records) - - then: "independent consumer span is created" - // check that the message was received - records.poll(5, TimeUnit.SECONDS) != null + def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + for (record in records) { + runWithSpan("processing") {} + } + then: assertTraces(2) { trace(0, 1) { span(0) { @@ -68,7 +54,7 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { } } } - trace(1, 2) { + trace(1, 3) { span(0) { name SHARED_TOPIC + " receive" kind CONSUMER @@ -92,55 +78,15 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 + "kafka.offset" Long "kafka.record.queue_time_ms" { it >= 0 } } + span(2) { + name "processing" + childOf span(1) + } } } } - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() - } - - protected KafkaMessageListenerContainer startConsumer(String groupId, records) { - // set up the Kafka consumer properties - Map consumerProperties = consumerProps(groupId, "false") - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - container - } - - @Override - def containerProperties() { - try { - // Different class names for test and latestDepTest. - return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) - } } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy index c09ec892c0..5b5121daa8 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy @@ -5,23 +5,10 @@ import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.utils.KafkaTestUtils -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit +import java.time.Duration import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.INTERNAL @@ -30,42 +17,8 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { def "test kafka produce and consume"() { - setup: - def senderProps = senderProps() - Producer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - waitForTraces(1) // ensure consistent ordering of traces - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - when: - String greeting = "Hello Spring Kafka Sender!" + String greeting = "Hello Kafka!" runWithSpan("parent") { producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> if (ex == null) { @@ -78,9 +31,13 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { then: // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - received.value() == greeting - received.key() == null + def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + for (record in records) { + runWithSpan("processing") { + assert record.value() == greeting + assert record.key() == null + } + } assertTraces(2) { traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) @@ -111,7 +68,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { producerSpan = span(1) } - trace(1, 2) { + trace(1, 3) { span(0) { name SHARED_TOPIC + " receive" kind CONSUMER @@ -135,177 +92,29 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 + "kafka.offset" Long "kafka.record.queue_time_ms" { it >= 0 } } } - } - } - - cleanup: - producer.close() - container?.stop() - } - - def "test spring kafka template produce and consume"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - - when: - String greeting = "Hello Spring Kafka Sender!" - runWithSpan("parent") { - kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({ - runWithSpan("producer callback") {} - }, { ex -> - runWithSpan("producer exception: " + ex) {} - }) - } - - then: - // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - received.value() == greeting - received.key() == null - - assertTraces(2) { - traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) - - SpanData producerSpan - - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } span(2) { - name "producer callback" - kind INTERNAL - childOf span(0) - } - - producerSpan = span(1) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - } + name "processing" + childOf span(1) } } } - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() } def "test pass through tombstone"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - when: - kafkaTemplate.send(SHARED_TOPIC, null) + producer.send(new ProducerRecord<>(SHARED_TOPIC, null)) then: // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - received.value() == null - received.key() == null + def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + for (record in records) { + assert record.value() == null + assert record.key() == null + } assertTraces(2) { traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) @@ -352,53 +161,34 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true - "kafka.offset" 0 + "kafka.offset" Long "kafka.record.queue_time_ms" { it >= 0 } } } } } - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() } def "test records(TopicPartition) kafka consume"() { setup: + def partition = 0 - // set up the Kafka consumer properties - def kafkaPartition = 0 - def consumerProperties = consumerProps("sender", "false") - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - def consumer = new KafkaConsumer(consumerProperties) - - def senderProps = senderProps() - def producer = new KafkaProducer(senderProps) - - consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) - - when: + when: "send message" def greeting = "Hello from MockConsumer!" - producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)) - then: + then: "wait for PRODUCER span" waitForTraces(1) - def records = new LinkedBlockingQueue>() - def pollResult = KafkaTestUtils.getRecords(consumer) - def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() - - def first = null - if (recs.hasNext()) { - first = recs.next() + when: "receive messages" + def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis()) + def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition)) + for (record in recordsInPartition) { + assert record.value() == greeting + assert record.key() == null } then: - recs.hasNext() == false - first.value() == greeting - first.key() == null - assertTraces(2) { traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) @@ -443,55 +233,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 + "kafka.offset" Long "kafka.record.queue_time_ms" { it >= 0 } } } } } - - cleanup: - consumer.close() - producer.close() - } - - protected KafkaMessageListenerContainer startConsumer(String groupId, records) { - // set up the Kafka consumer properties - Map consumerProperties = consumerProps(groupId, "false") - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - container - } - - @Override - def containerProperties() { - try { - // Different class names for test and latestDepTest. - return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) - } } } diff --git a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/build.gradle.kts deleted file mode 100644 index 6be486bccb..0000000000 --- a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/build.gradle.kts +++ /dev/null @@ -1,40 +0,0 @@ -plugins { - id("otel.javaagent-testing") -} - -dependencies { - library("org.apache.kafka:kafka-clients:2.4.0") - - testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")) - - testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE") - testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE") - testLibrary("org.springframework:spring-core:5.2.9.RELEASE") - testImplementation("javax.xml.bind:jaxb-api:2.2.3") - - latestDepTestLibrary("org.apache.kafka:kafka_2.13:+") -} - -tasks { - withType().configureEach { - // TODO run tests both with and without experimental span attributes - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") - } - - val testPropagationDisabled by registering(Test::class) { - filter { - includeTestsMatching("KafkaClientPropagationDisabledTest") - isFailOnNoMatchingTests = false - } - include("**/KafkaClientPropagationDisabledTest.*") - jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false") - } - - named("test") { - dependsOn(testPropagationDisabled) - filter { - excludeTestsMatching("KafkaClientPropagationDisabledTest") - isFailOnNoMatchingTests = false - } - } -} diff --git a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy deleted file mode 100644 index 3ea67994b4..0000000000 --- a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.junit.Rule -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.rule.EmbeddedKafkaRule -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Unroll - -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit - -abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification { - - protected static final SHARED_TOPIC = "shared.topic" - - private static final boolean propagationEnabled = Boolean.parseBoolean( - System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true")) - - @Rule - EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, SHARED_TOPIC) - - abstract containerProperties() - - Map senderProps() { - return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString()) - } - - Map consumerProps(String group, String autoCommit) { - return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka()) - } - - void waitForAssignment(Object container) { - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()) - } - - def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { - producerFactory.destroy() - } - - @Unroll - def "test kafka client header propagation manual config"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - - when: - String message = "Testing without headers" - kafkaTemplate.send(SHARED_TOPIC, message) - - then: - // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - - received.headers().iterator().hasNext() == propagationEnabled - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() - } -} diff --git a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy deleted file mode 100644 index a77989e3d8..0000000000 --- a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener - -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { - - def "should not read remote context when consuming messages if propagation is disabled"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - when: "send message" - String message = "Testing without headers" - kafkaTemplate.send(SHARED_TOPIC, message) - - then: "producer span is created" - assertTraces(1) { - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - } - } - - when: "read message without context propagation" - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - KafkaMessageListenerContainer container = startConsumer("consumer-without-propagation", records) - - then: "independent consumer span is created" - // check that the message was received - records.poll(5, TimeUnit.SECONDS) != null - - assertTraces(2) { - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasNoLinks() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - - } - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() - } - - protected KafkaMessageListenerContainer startConsumer(String groupId, records) { - // set up the Kafka consumer properties - Map consumerProperties = consumerProps(groupId, "false") - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - container - } - - @Override - def containerProperties() { - try { - // Different class names for test and latestDepTest. - return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) - } - } -} diff --git a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy deleted file mode 100644 index c09ec892c0..0000000000 --- a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.utils.KafkaTestUtils - -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { - - def "test kafka produce and consume"() { - setup: - def senderProps = senderProps() - Producer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - waitForTraces(1) // ensure consistent ordering of traces - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - - when: - String greeting = "Hello Spring Kafka Sender!" - runWithSpan("parent") { - producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> - if (ex == null) { - runWithSpan("producer callback") {} - } else { - runWithSpan("producer exception: " + ex) {} - } - } - } - - then: - // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - received.value() == greeting - received.key() == null - - assertTraces(2) { - traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) - - SpanData producerSpan - - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - span(2) { - name "producer callback" - kind INTERNAL - childOf span(0) - } - - producerSpan = span(1) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - - cleanup: - producer.close() - container?.stop() - } - - def "test spring kafka template produce and consume"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - - when: - String greeting = "Hello Spring Kafka Sender!" - runWithSpan("parent") { - kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({ - runWithSpan("producer callback") {} - }, { ex -> - runWithSpan("producer exception: " + ex) {} - }) - } - - then: - // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - received.value() == greeting - received.key() == null - - assertTraces(2) { - traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) - - SpanData producerSpan - - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - span(2) { - name "producer callback" - kind INTERNAL - childOf span(0) - } - - producerSpan = span(1) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() - } - - def "test pass through tombstone"() { - setup: - def senderProps = senderProps() - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - // set up the Kafka consumer properties - def consumerProperties = consumerProps("sender", "false") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - - when: - kafkaTemplate.send(SHARED_TOPIC, null) - - then: - // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) - received.value() == null - received.key() == null - - assertTraces(2) { - traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) - - SpanData producerSpan - - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true - } - } - - producerSpan = span(0) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - - cleanup: - stopProducerFactory(producerFactory) - container?.stop() - } - - def "test records(TopicPartition) kafka consume"() { - setup: - - // set up the Kafka consumer properties - def kafkaPartition = 0 - def consumerProperties = consumerProps("sender", "false") - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - def consumer = new KafkaConsumer(consumerProperties) - - def senderProps = senderProps() - def producer = new KafkaProducer(senderProps) - - consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) - - when: - def greeting = "Hello from MockConsumer!" - producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) - - then: - waitForTraces(1) - def records = new LinkedBlockingQueue>() - def pollResult = KafkaTestUtils.getRecords(consumer) - - def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() - - def first = null - if (recs.hasNext()) { - first = recs.next() - } - - then: - recs.hasNext() == false - first.value() == greeting - first.key() == null - - assertTraces(2) { - traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) - - SpanData producerSpan - - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - } - } - - producerSpan = span(0) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - - cleanup: - consumer.close() - producer.close() - } - - protected KafkaMessageListenerContainer startConsumer(String groupId, records) { - // set up the Kafka consumer properties - Map consumerProperties = consumerProps(groupId, "false") - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - - // set the topic that needs to be consumed - def containerProperties = containerProperties() - - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - records.add(record) - } - }) - - // start the container and underlying message listener - container.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(container) - container - } - - @Override - def containerProperties() { - try { - // Different class names for test and latestDepTest. - return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) - } - } -} diff --git a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts index 47b7a6b1c5..60a7cdaa85 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts @@ -11,47 +11,26 @@ muzzle { } } -testSets { - create("latestDepTest") -} +val versions: Map by project dependencies { implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) - compileOnly("org.apache.kafka:kafka-streams:0.11.0.0") + library("org.apache.kafka:kafka-streams:0.11.0.0") // Include kafka-clients instrumentation for tests. testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")) - testImplementation("org.apache.kafka:kafka-streams:0.11.0.0") - testImplementation("org.apache.kafka:kafka-clients:0.11.0.0") - testImplementation("org.springframework.kafka:spring-kafka:1.3.3.RELEASE") - testImplementation("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE") - testImplementation("javax.xml.bind:jaxb-api:2.2.3") - testImplementation("org.assertj:assertj-core") + testImplementation("org.testcontainers:kafka:${versions["org.testcontainers"]}") - add("latestDepTestImplementation", "org.apache.kafka:kafka_2.13:2.+") - add("latestDepTestImplementation", "org.apache.kafka:kafka-clients:2.+") - add("latestDepTestImplementation", "org.apache.kafka:kafka-streams:2.+") - add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka:+") - add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka-test:+") + latestDepTestLibrary("org.apache.kafka:kafka-streams:2.+") } tasks { - withType().configureEach { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + // TODO run tests both with and without experimental span attributes jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") } - - if (findProperty("testLatestDeps") as Boolean) { - // latestDepTest is still run - named("test") { - enabled = false - } - } -} - -// Requires old version of AssertJ for baseline -if (!(findProperty("testLatestDeps") as Boolean)) { - configurations.testRuntimeClasspath.resolutionStrategy.force("org.assertj:assertj-core:2.9.1") } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy deleted file mode 100644 index ced88a36a9..0000000000 --- a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator -import io.opentelemetry.context.Context -import io.opentelemetry.context.propagation.TextMapGetter -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsConfig -import org.apache.kafka.streams.kstream.KStream -import org.apache.kafka.streams.kstream.ValueMapper -import org.junit.ClassRule -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.rule.EmbeddedKafkaRule -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Shared - -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaStreamsTest extends AgentInstrumentationSpecification { - - static final STREAM_PENDING = "test.pending" - static final STREAM_PROCESSED = "test.processed" - - @Shared - @ClassRule - EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, STREAM_PENDING, STREAM_PROCESSED) - - Map senderProps() { - return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString()) - } - - Map consumerProps(String group, String autoCommit) { - return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka()) - } - - void waitForAssignment(Object container) { - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()) - } - - def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { - producerFactory.destroy() - } - - def "test kafka produce and consume with streams in-between"() { - setup: - def config = new Properties() - def senderProps = senderProps() - config.putAll(senderProps) - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - - // CONFIGURE CONSUMER - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProps("sender", "false")) - - def containerProperties - try { - // Different class names for test and latestDepTest. - containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED) - } - def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the processed message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - consumerContainer.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - Span.current().setAttribute("testing", 123) - records.add(record) - } - }) - - // start the container and underlying message listener - consumerContainer.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(consumerContainer) - - // CONFIGURE PROCESSOR - def builder - try { - // Different class names for test and latestDepTest. - builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance() - } catch (ClassNotFoundException | NoClassDefFoundError e) { - builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance() - } - KStream textLines = builder.stream(STREAM_PENDING) - def values = textLines - .mapValues(new ValueMapper() { - @Override - String apply(String textLine) { - Span.current().setAttribute("asdf", "testing") - return textLine.toLowerCase() - } - }) - - KafkaStreams streams - try { - // Different api for test and latestDepTest. - values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED) - streams = new KafkaStreams(builder, config) - } catch (MissingMethodException e) { - def producer = Class.forName("org.apache.kafka.streams.kstream.Produced") - .with(Serdes.String(), Serdes.String()) - values.to(STREAM_PROCESSED, producer) - streams = new KafkaStreams(builder.build(), config) - } - streams.start() - - // CONFIGURE PRODUCER - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - - when: - String greeting = "TESTING TESTING 123!" - kafkaTemplate.send(STREAM_PENDING, greeting) - - then: - // check that the message was received - def received = records.poll(10, TimeUnit.SECONDS) - received.value() == greeting.toLowerCase() - received.key() == null - - assertTraces(3) { - traces.sort(orderByRootSpanName( - STREAM_PENDING + " send", - STREAM_PENDING + " receive", - STREAM_PROCESSED + " receive")) - - SpanData producerPending, producerProcessed - - trace(0, 1) { - // kafka-clients PRODUCER - span(0) { - name STREAM_PENDING + " send" - kind PRODUCER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - - producerPending = span(0) - } - trace(1, 3) { - // kafka-clients CONSUMER receive - span(0) { - name STREAM_PENDING + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - // kafka-stream CONSUMER - span(1) { - name STREAM_PENDING + " process" - kind CONSUMER - childOf span(0) - hasLink producerPending - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - "asdf" "testing" - } - } - // kafka-clients PRODUCER - span(2) { - name STREAM_PROCESSED + " send" - kind PRODUCER - childOf span(1) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } - - producerProcessed = span(2) - } - trace(2, 2) { - // kafka-clients CONSUMER receive - span(0) { - name STREAM_PROCESSED + " receive" - kind CONSUMER - hasNoParent() - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - } - } - // kafka-clients CONSUMER process - span(1) { - name STREAM_PROCESSED + " process" - kind CONSUMER - childOf span(0) - hasLink producerProcessed - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - "testing" 123 - } - } - } - } - - def headers = received.headers() - headers.iterator().hasNext() - def traceparent = new String(headers.headers("traceparent").iterator().next().value()) - Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter() { - @Override - Iterable keys(String carrier) { - return Collections.singleton("traceparent") - } - - @Override - String get(String carrier, String key) { - if (key == "traceparent") { - return traceparent - } - return null - } - }) - def spanContext = Span.fromContext(context).getSpanContext() - def streamTrace = traces.find { it.size() == 3 } - def streamSendSpan = streamTrace[2] - spanContext.traceId == streamSendSpan.traceId - spanContext.spanId == streamSendSpan.spanId - - - cleanup: - stopProducerFactory(producerFactory) - streams?.close() - consumerContainer?.stop() - } -} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy index f29c7592df..7dce086d74 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy @@ -10,24 +10,28 @@ import io.opentelemetry.context.propagation.TextMapGetter import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.serialization.IntegerDeserializer +import org.apache.kafka.common.serialization.IntegerSerializer import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.ValueMapper -import org.junit.ClassRule -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.rule.KafkaEmbedded -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.kafka.test.utils.KafkaTestUtils +import org.testcontainers.containers.KafkaContainer import spock.lang.Shared -import java.util.concurrent.LinkedBlockingQueue +import java.time.Duration import java.util.concurrent.TimeUnit import static io.opentelemetry.api.trace.SpanKind.CONSUMER @@ -39,64 +43,59 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { static final STREAM_PROCESSED = "test.processed" @Shared - @ClassRule - KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STREAM_PENDING, STREAM_PROCESSED) + static KafkaContainer kafka + @Shared + static Producer producer + @Shared + static Consumer consumer - Map senderProps() { - return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + + def setupSpec() { + kafka = new KafkaContainer() + kafka.start() + + // create test topic + AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin -> + admin.createTopics([ + new NewTopic(STREAM_PENDING, 1, (short) 1), + new NewTopic(STREAM_PROCESSED, 1, (short) 1), + ]).all().get(10, TimeUnit.SECONDS) + } + + producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers)) + + // values copied from spring's KafkaTestUtils + def consumerProps = [ + "bootstrap.servers" : kafka.bootstrapServers, + "group.id" : "test", + "enable.auto.commit" : "false", + "auto.commit.interval.ms": "10", + "session.timeout.ms" : "60000", + "key.deserializer" : IntegerDeserializer, + "value.deserializer" : StringDeserializer + ] + consumer = new KafkaConsumer<>(consumerProps) + + // assign topic partitions + consumer.assign([ + new TopicPartition(STREAM_PROCESSED, 0) + ]) } - Map consumerProps(String group, String autoCommit) { - return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka) - } - - void waitForAssignment(Object container) { - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) - } - - def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) { - producerFactory.stop() + def cleanupSpec() { + consumer?.close() + producer?.close() + kafka.stop() } def "test kafka produce and consume with streams in-between"() { setup: def config = new Properties() - def senderProps = senderProps() - config.putAll(senderProps) + config.putAll(producerProps(kafka.bootstrapServers)) config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()) config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - // CONFIGURE CONSUMER - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProps("sender", "false")) - - def containerProperties - try { - // Different class names for test and latestDepTest. - containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED) - } catch (ClassNotFoundException | NoClassDefFoundError e) { - containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED) - } - def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - - // create a thread safe queue to store the processed message - def records = new LinkedBlockingQueue>() - - // setup a Kafka message listener - consumerContainer.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - Span.current().setAttribute("testing", 123) - records.add(record) - } - }) - - // start the container and underlying message listener - consumerContainer.start() - - // wait until the container has the required number of assigned partitions - waitForAssignment(consumerContainer) - // CONFIGURE PROCESSOR def builder try { @@ -128,19 +127,24 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { } streams.start() - // CONFIGURE PRODUCER - def producerFactory = new DefaultKafkaProducerFactory(senderProps) - def kafkaTemplate = new KafkaTemplate(producerFactory) - when: String greeting = "TESTING TESTING 123!" - kafkaTemplate.send(STREAM_PENDING, greeting) + producer.send(new ProducerRecord<>(STREAM_PENDING, greeting)) then: // check that the message was received - def received = records.poll(10, TimeUnit.SECONDS) - received.value() == greeting.toLowerCase() - received.key() == null + def records = consumer.poll(Duration.ofSeconds(10).toMillis()) + Headers receivedHeaders = null + for (record in records) { + Span.current().setAttribute("testing", 123) + + assert record.value() == greeting.toLowerCase() + assert record.key() == null + + if (receivedHeaders == null) { + receivedHeaders = record.headers() + } + } assertTraces(3) { traces.sort(orderByRootSpanName( @@ -244,9 +248,8 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { } } - def headers = received.headers() - headers.iterator().hasNext() - def traceparent = new String(headers.headers("traceparent").iterator().next().value()) + receivedHeaders.iterator().hasNext() + def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value()) Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter() { @Override Iterable keys(String carrier) { @@ -266,11 +269,18 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { def streamSendSpan = streamTrace[2] spanContext.traceId == streamSendSpan.traceId spanContext.spanId == streamSendSpan.spanId + } - - cleanup: - stopProducerFactory(producerFactory) - streams?.close() - consumerContainer?.stop() + private static Map producerProps(String servers) { + // values copied from spring's KafkaTestUtils + return [ + "bootstrap.servers": servers, + "retries" : 0, + "batch.size" : "16384", + "linger.ms" : 1, + "buffer.memory" : "33554432", + "key.serializer" : IntegerSerializer, + "value.serializer" : StringSerializer + ] } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 4e949c36cf..7fa519388f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -211,7 +211,6 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent") include(":instrumentation:jsf:myfaces-1.2:javaagent") include(":instrumentation:jsp-2.3:javaagent") include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent") -include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing") include(":instrumentation:kafka-clients:kafka-clients-common:javaagent") include(":instrumentation:kafka-streams-0.11:javaagent") include(":instrumentation:kotlinx-coroutines:javaagent")