Test latest version of kafka client and streams (#3803)
This commit is contained in:
parent
421fec4cfe
commit
d9080a745b
|
@ -18,7 +18,6 @@ dependencies {
|
|||
testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
|
||||
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
|
||||
testLibrary("org.assertj:assertj-core")
|
||||
testImplementation("org.mockito:mockito-core")
|
||||
|
||||
// Include latest version of kafka itself along with latest version of client libs.
|
||||
// This seems to help with jar compatibility hell.
|
||||
|
|
|
@ -28,15 +28,33 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
|||
@Rule
|
||||
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)
|
||||
|
||||
abstract containerProperties()
|
||||
|
||||
Map<String, Object> senderProps() {
|
||||
return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
}
|
||||
|
||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka)
|
||||
}
|
||||
|
||||
void waitForAssignment(Object container) {
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
}
|
||||
|
||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
||||
producerFactory.stop()
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "test kafka client header propagation manual config"() {
|
||||
setup:
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
@ -62,7 +80,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
|||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
String message = "Testing without headers"
|
||||
|
@ -75,7 +93,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
|||
received.headers().iterator().hasNext() == propagationEnabled
|
||||
|
||||
cleanup:
|
||||
producerFactory.stop()
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,14 +16,12 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.kafka.listener.MessageListener
|
||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
||||
|
||||
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
||||
|
||||
def "should not read remote context when consuming messages if propagation is disabled"() {
|
||||
setup:
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
|
@ -90,13 +88,13 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
|||
}
|
||||
|
||||
cleanup:
|
||||
producerFactory.stop()
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
||||
// set up the Kafka consumer properties
|
||||
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
|
||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
|
@ -120,11 +118,11 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
|||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(container)
|
||||
container
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
def containerProperties() {
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
|
|
|
@ -23,18 +23,17 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.kafka.listener.MessageListener
|
||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
||||
|
||||
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||
|
||||
def "test kafka produce and consume"() {
|
||||
setup:
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
@ -61,7 +60,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
String greeting = "Hello Spring Kafka Sender!"
|
||||
|
@ -128,12 +127,12 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
|
||||
def "test spring kafka template produce and consume"() {
|
||||
setup:
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
@ -159,7 +158,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
String greeting = "Hello Spring Kafka Sender!"
|
||||
|
@ -218,18 +217,18 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
}
|
||||
|
||||
cleanup:
|
||||
producerFactory.stop()
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
def "test pass through tombstone"() {
|
||||
setup:
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
@ -255,7 +254,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
kafkaTemplate.send(SHARED_TOPIC, null)
|
||||
|
@ -301,7 +300,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
}
|
||||
|
||||
cleanup:
|
||||
producerFactory.stop()
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
|
@ -310,11 +309,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
|
||||
// set up the Kafka consumer properties
|
||||
def kafkaPartition = 0
|
||||
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
def consumer = new KafkaConsumer<String, String>(consumerProperties)
|
||||
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
def producer = new KafkaProducer(senderProps)
|
||||
|
||||
consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
|
||||
|
@ -378,7 +377,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
|
||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
||||
// set up the Kafka consumer properties
|
||||
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
|
||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
|
@ -402,11 +401,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(container)
|
||||
container
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
def containerProperties() {
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
plugins {
|
||||
id("otel.javaagent-testing")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
library("org.apache.kafka:kafka-clients:2.4.0")
|
||||
|
||||
testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent"))
|
||||
|
||||
testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE")
|
||||
testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE")
|
||||
testLibrary("org.springframework:spring-core:5.2.9.RELEASE")
|
||||
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
|
||||
|
||||
latestDepTestLibrary("org.apache.kafka:kafka_2.13:+")
|
||||
}
|
||||
|
||||
tasks {
|
||||
withType<Test>().configureEach {
|
||||
// TODO run tests both with and without experimental span attributes
|
||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||
}
|
||||
|
||||
val testPropagationDisabled by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("KafkaClientPropagationDisabledTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/KafkaClientPropagationDisabledTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
|
||||
}
|
||||
|
||||
named<Test>("test") {
|
||||
dependsOn(testPropagationDisabled)
|
||||
filter {
|
||||
excludeTestsMatching("KafkaClientPropagationDisabledTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.junit.Rule
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.kafka.listener.MessageListener
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
|
||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
||||
import spock.lang.Unroll
|
||||
|
||||
abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
||||
|
||||
protected static final SHARED_TOPIC = "shared.topic"
|
||||
|
||||
private static final boolean propagationEnabled = Boolean.parseBoolean(
|
||||
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
|
||||
|
||||
@Rule
|
||||
EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, SHARED_TOPIC)
|
||||
|
||||
abstract containerProperties()
|
||||
|
||||
Map<String, Object> senderProps() {
|
||||
return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString())
|
||||
}
|
||||
|
||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka())
|
||||
}
|
||||
|
||||
void waitForAssignment(Object container) {
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic())
|
||||
}
|
||||
|
||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
||||
producerFactory.destroy()
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "test kafka client header propagation manual config"() {
|
||||
setup:
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
||||
// set the topic that needs to be consumed
|
||||
def containerProperties = containerProperties()
|
||||
|
||||
// create a Kafka MessageListenerContainer
|
||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// create a thread safe queue to store the received message
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
|
||||
// setup a Kafka message listener
|
||||
container.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
String message = "Testing without headers"
|
||||
kafkaTemplate.send(SHARED_TOPIC, message)
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def received = records.poll(5, TimeUnit.SECONDS)
|
||||
|
||||
received.headers().iterator().hasNext() == propagationEnabled
|
||||
|
||||
cleanup:
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.kafka.listener.MessageListener
|
||||
|
||||
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
||||
|
||||
def "should not read remote context when consuming messages if propagation is disabled"() {
|
||||
setup:
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
when: "send message"
|
||||
String message = "Testing without headers"
|
||||
kafkaTemplate.send(SHARED_TOPIC, message)
|
||||
|
||||
then: "producer span is created"
|
||||
assertTraces(1) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
when: "read message without context propagation"
|
||||
// create a thread safe queue to store the received message
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
KafkaMessageListenerContainer<Object, Object> container = startConsumer("consumer-without-propagation", records)
|
||||
|
||||
then: "independent consumer span is created"
|
||||
// check that the message was received
|
||||
records.poll(5, TimeUnit.SECONDS) != null
|
||||
|
||||
assertTraces(2) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cleanup:
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
||||
// set up the Kafka consumer properties
|
||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
||||
// set the topic that needs to be consumed
|
||||
def containerProperties = containerProperties()
|
||||
|
||||
// create a Kafka MessageListenerContainer
|
||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// setup a Kafka message listener
|
||||
container.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(container)
|
||||
container
|
||||
}
|
||||
|
||||
@Override
|
||||
def containerProperties() {
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,417 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.clients.producer.Producer
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.kafka.listener.MessageListener
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
||||
|
||||
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||
|
||||
def "test kafka produce and consume"() {
|
||||
setup:
|
||||
def senderProps = senderProps()
|
||||
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
||||
// set the topic that needs to be consumed
|
||||
def containerProperties = containerProperties()
|
||||
|
||||
// create a Kafka MessageListenerContainer
|
||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// create a thread safe queue to store the received message
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
|
||||
// setup a Kafka message listener
|
||||
container.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
waitForTraces(1) // ensure consistent ordering of traces
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
String greeting = "Hello Spring Kafka Sender!"
|
||||
runWithSpan("parent") {
|
||||
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
|
||||
if (ex == null) {
|
||||
runWithSpan("producer callback") {}
|
||||
} else {
|
||||
runWithSpan("producer exception: " + ex) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def received = records.poll(5, TimeUnit.SECONDS)
|
||||
received.value() == greeting
|
||||
received.key() == null
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind SpanKind.INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "producer callback"
|
||||
kind SpanKind.INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
producer.close()
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
def "test spring kafka template produce and consume"() {
|
||||
setup:
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
||||
// set the topic that needs to be consumed
|
||||
def containerProperties = containerProperties()
|
||||
|
||||
// create a Kafka MessageListenerContainer
|
||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// create a thread safe queue to store the received message
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
|
||||
// setup a Kafka message listener
|
||||
container.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
String greeting = "Hello Spring Kafka Sender!"
|
||||
runWithSpan("parent") {
|
||||
kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({
|
||||
runWithSpan("producer callback") {}
|
||||
}, { ex ->
|
||||
runWithSpan("producer exception: " + ex) {}
|
||||
})
|
||||
}
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def received = records.poll(5, TimeUnit.SECONDS)
|
||||
received.value() == greeting
|
||||
received.key() == null
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind SpanKind.INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "producer callback"
|
||||
kind SpanKind.INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
def "test pass through tombstone"() {
|
||||
setup:
|
||||
def senderProps = senderProps()
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
||||
// set the topic that needs to be consumed
|
||||
def containerProperties = containerProperties()
|
||||
|
||||
// create a Kafka MessageListenerContainer
|
||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// create a thread safe queue to store the received message
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
|
||||
// setup a Kafka message listener
|
||||
container.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(container)
|
||||
|
||||
when:
|
||||
kafkaTemplate.send(SHARED_TOPIC, null)
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def received = records.poll(5, TimeUnit.SECONDS)
|
||||
received.value() == null
|
||||
received.key() == null
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
// PRODUCER span 0
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
||||
}
|
||||
}
|
||||
// CONSUMER span 0
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
stopProducerFactory(producerFactory)
|
||||
container?.stop()
|
||||
}
|
||||
|
||||
def "test records(TopicPartition) kafka consume"() {
|
||||
setup:
|
||||
|
||||
// set up the Kafka consumer properties
|
||||
def kafkaPartition = 0
|
||||
def consumerProperties = consumerProps("sender", "false")
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
def consumer = new KafkaConsumer<String, String>(consumerProperties)
|
||||
|
||||
def senderProps = senderProps()
|
||||
def producer = new KafkaProducer(senderProps)
|
||||
|
||||
consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
|
||||
|
||||
when:
|
||||
def greeting = "Hello from MockConsumer!"
|
||||
producer.send(new ProducerRecord<Integer, String>(SHARED_TOPIC, kafkaPartition, null, greeting))
|
||||
|
||||
then:
|
||||
waitForTraces(1)
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
def pollResult = KafkaTestUtils.getRecords(consumer)
|
||||
|
||||
def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator()
|
||||
|
||||
def first = null
|
||||
if (recs.hasNext()) {
|
||||
first = recs.next()
|
||||
}
|
||||
|
||||
then:
|
||||
recs.hasNext() == false
|
||||
first.value() == greeting
|
||||
first.key() == null
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
consumer.close()
|
||||
producer.close()
|
||||
}
|
||||
|
||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
||||
// set up the Kafka consumer properties
|
||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
// create a Kafka consumer factory
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
||||
|
||||
// set the topic that needs to be consumed
|
||||
def containerProperties = containerProperties()
|
||||
|
||||
// create a Kafka MessageListenerContainer
|
||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// setup a Kafka message listener
|
||||
container.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
container.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(container)
|
||||
container
|
||||
}
|
||||
|
||||
@Override
|
||||
def containerProperties() {
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
id("org.unbroken-dome.test-sets")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
|
@ -10,36 +11,42 @@ muzzle {
|
|||
}
|
||||
}
|
||||
|
||||
testSets {
|
||||
create("latestDepTest")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
library("org.apache.kafka:kafka-streams:0.11.0.0")
|
||||
compileOnly("org.apache.kafka:kafka-streams:0.11.0.0")
|
||||
|
||||
// Include kafka-clients instrumentation for tests.
|
||||
testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent"))
|
||||
|
||||
testLibrary("org.apache.kafka:kafka-clients:0.11.0.0")
|
||||
testLibrary("org.springframework.kafka:spring-kafka:1.3.3.RELEASE")
|
||||
testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
|
||||
testImplementation("org.apache.kafka:kafka-streams:0.11.0.0")
|
||||
testImplementation("org.apache.kafka:kafka-clients:0.11.0.0")
|
||||
testImplementation("org.springframework.kafka:spring-kafka:1.3.3.RELEASE")
|
||||
testImplementation("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
|
||||
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
|
||||
testImplementation("org.mockito:mockito-core")
|
||||
testLibrary("org.assertj:assertj-core")
|
||||
testImplementation("org.assertj:assertj-core")
|
||||
|
||||
|
||||
// Include latest version of kafka itself along with latest version of client libs.
|
||||
// This seems to help with jar compatibility hell.
|
||||
latestDepTestLibrary("org.apache.kafka:kafka_2.11:2.3.+")
|
||||
// (Pinning to 2.3.x: 2.4.0 introduces an error when executing compileLatestDepTestGroovy)
|
||||
// Caused by: java.lang.NoClassDefFoundError: org.I0Itec.zkclient.ZkClient
|
||||
latestDepTestLibrary("org.apache.kafka:kafka-clients:2.3.+")
|
||||
latestDepTestLibrary("org.apache.kafka:kafka-streams:2.3.+")
|
||||
latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.2.+")
|
||||
latestDepTestLibrary("org.springframework.kafka:spring-kafka-test:2.2.+")
|
||||
// assertj-core:3.20.0 is incompatible with spring-kafka-test:2.7.2
|
||||
latestDepTestLibrary("org.assertj:assertj-core:3.19.0")
|
||||
add("latestDepTestImplementation", "org.apache.kafka:kafka_2.13:+")
|
||||
add("latestDepTestImplementation", "org.apache.kafka:kafka-clients:+")
|
||||
add("latestDepTestImplementation", "org.apache.kafka:kafka-streams:+")
|
||||
add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka:+")
|
||||
add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka-test:+")
|
||||
}
|
||||
|
||||
tasks.withType<Test>().configureEach {
|
||||
// TODO run tests both with and without experimental span attributes
|
||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||
tasks {
|
||||
withType<Test>().configureEach {
|
||||
// TODO run tests both with and without experimental span attributes
|
||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||
}
|
||||
|
||||
if (findProperty("testLatestDeps") as Boolean) {
|
||||
// latestDepTest is still run
|
||||
named("test") {
|
||||
enabled = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Requires old version of AssertJ for baseline
|
||||
|
|
|
@ -0,0 +1,246 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.context.propagation.TextMapGetter
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.apache.kafka.common.serialization.Serdes
|
||||
import org.apache.kafka.streams.KafkaStreams
|
||||
import org.apache.kafka.streams.StreamsConfig
|
||||
import org.apache.kafka.streams.kstream.KStream
|
||||
import org.apache.kafka.streams.kstream.ValueMapper
|
||||
import org.junit.ClassRule
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.kafka.listener.MessageListener
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
|
||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
||||
import spock.lang.Shared
|
||||
|
||||
class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
||||
|
||||
static final STREAM_PENDING = "test.pending"
|
||||
static final STREAM_PROCESSED = "test.processed"
|
||||
|
||||
@Shared
|
||||
@ClassRule
|
||||
EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, STREAM_PENDING, STREAM_PROCESSED)
|
||||
|
||||
Map<String, Object> senderProps() {
|
||||
return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString())
|
||||
}
|
||||
|
||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka())
|
||||
}
|
||||
|
||||
void waitForAssignment(Object container) {
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic())
|
||||
}
|
||||
|
||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
||||
producerFactory.destroy()
|
||||
}
|
||||
|
||||
def "test kafka produce and consume with streams in-between"() {
|
||||
setup:
|
||||
def config = new Properties()
|
||||
def senderProps = senderProps()
|
||||
config.putAll(senderProps)
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
||||
|
||||
// CONFIGURE CONSUMER
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps("sender", "false"))
|
||||
|
||||
def containerProperties
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED)
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED)
|
||||
}
|
||||
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
||||
|
||||
// create a thread safe queue to store the processed message
|
||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
||||
|
||||
// setup a Kafka message listener
|
||||
consumerContainer.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
Span.current().setAttribute("testing", 123)
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
||||
// start the container and underlying message listener
|
||||
consumerContainer.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
waitForAssignment(consumerContainer)
|
||||
|
||||
// CONFIGURE PROCESSOR
|
||||
def builder
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
|
||||
}
|
||||
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
|
||||
def values = textLines
|
||||
.mapValues(new ValueMapper<String, String>() {
|
||||
@Override
|
||||
String apply(String textLine) {
|
||||
Span.current().setAttribute("asdf", "testing")
|
||||
return textLine.toLowerCase()
|
||||
}
|
||||
})
|
||||
|
||||
KafkaStreams streams
|
||||
try {
|
||||
// Different api for test and latestDepTest.
|
||||
values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED)
|
||||
streams = new KafkaStreams(builder, config)
|
||||
} catch (MissingMethodException e) {
|
||||
def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
|
||||
.with(Serdes.String(), Serdes.String())
|
||||
values.to(STREAM_PROCESSED, producer)
|
||||
streams = new KafkaStreams(builder.build(), config)
|
||||
}
|
||||
streams.start()
|
||||
|
||||
// CONFIGURE PRODUCER
|
||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
||||
|
||||
when:
|
||||
String greeting = "TESTING TESTING 123!"
|
||||
kafkaTemplate.send(STREAM_PENDING, greeting)
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def received = records.poll(10, TimeUnit.SECONDS)
|
||||
received.value() == greeting.toLowerCase()
|
||||
received.key() == null
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 5) {
|
||||
// PRODUCER span 0
|
||||
span(0) {
|
||||
name STREAM_PENDING + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
// CONSUMER span 0
|
||||
span(1) {
|
||||
name STREAM_PENDING + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
// STREAMING span 1
|
||||
span(2) {
|
||||
name STREAM_PENDING + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"asdf" "testing"
|
||||
}
|
||||
}
|
||||
// STREAMING span 0
|
||||
span(3) {
|
||||
name STREAM_PROCESSED + " send"
|
||||
kind PRODUCER
|
||||
childOf span(2)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
// CONSUMER span 0
|
||||
span(4) {
|
||||
name STREAM_PROCESSED + " process"
|
||||
kind CONSUMER
|
||||
childOf span(3)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
"testing" 123
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def headers = received.headers()
|
||||
headers.iterator().hasNext()
|
||||
def traceparent = new String(headers.headers("traceparent").iterator().next().value())
|
||||
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
|
||||
@Override
|
||||
Iterable<String> keys(String carrier) {
|
||||
return Collections.singleton("traceparent")
|
||||
}
|
||||
|
||||
@Override
|
||||
String get(String carrier, String key) {
|
||||
if (key == "traceparent") {
|
||||
return traceparent
|
||||
}
|
||||
return null
|
||||
}
|
||||
})
|
||||
def spanContext = Span.fromContext(context).getSpanContext()
|
||||
def streamSendSpan = traces[0][3]
|
||||
spanContext.traceId == streamSendSpan.traceId
|
||||
spanContext.spanId == streamSendSpan.spanId
|
||||
|
||||
|
||||
cleanup:
|
||||
stopProducerFactory(producerFactory)
|
||||
streams?.close()
|
||||
consumerContainer?.stop()
|
||||
}
|
||||
}
|
|
@ -40,17 +40,33 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
|||
@ClassRule
|
||||
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STREAM_PENDING, STREAM_PROCESSED)
|
||||
|
||||
Map<String, Object> senderProps() {
|
||||
return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
}
|
||||
|
||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka)
|
||||
}
|
||||
|
||||
void waitForAssignment(Object container) {
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
||||
}
|
||||
|
||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
||||
producerFactory.stop()
|
||||
}
|
||||
|
||||
def "test kafka produce and consume with streams in-between"() {
|
||||
setup:
|
||||
def config = new Properties()
|
||||
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
||||
def senderProps = senderProps()
|
||||
config.putAll(senderProps)
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
||||
|
||||
// CONFIGURE CONSUMER
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(KafkaTestUtils.consumerProps("sender", "false", embeddedKafka))
|
||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps("sender", "false"))
|
||||
|
||||
def containerProperties
|
||||
try {
|
||||
|
@ -77,7 +93,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
|||
consumerContainer.start()
|
||||
|
||||
// wait until the container has the required number of assigned partitions
|
||||
ContainerTestUtils.waitForAssignment(consumerContainer, embeddedKafka.getPartitionsPerTopic())
|
||||
waitForAssignment(consumerContainer)
|
||||
|
||||
// CONFIGURE PROCESSOR
|
||||
def builder
|
||||
|
@ -223,7 +239,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
|||
|
||||
|
||||
cleanup:
|
||||
producerFactory?.stop()
|
||||
stopProducerFactory(producerFactory)
|
||||
streams?.close()
|
||||
consumerContainer?.stop()
|
||||
}
|
||||
|
|
|
@ -208,6 +208,7 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent")
|
|||
include(":instrumentation:jsf:myfaces-1.2:javaagent")
|
||||
include(":instrumentation:jsp-2.3:javaagent")
|
||||
include(":instrumentation:kafka-clients-0.11:javaagent")
|
||||
include(":instrumentation:kafka-clients-0.11:kafka-clients-2.4.0-testing")
|
||||
include(":instrumentation:kafka-streams-0.11:javaagent")
|
||||
include(":instrumentation:kotlinx-coroutines:javaagent")
|
||||
include(":instrumentation:kubernetes-client-7.0:javaagent")
|
||||
|
|
Loading…
Reference in New Issue