Rewrite kafka-clients and kafka-streams tests to use testcontainers (#4178)
* Rewrite kafka-clients and kafka-streams tests to use testcontainers * codenarc
This commit is contained in:
parent
feebef3bd9
commit
12d60b05d7
|
@ -11,6 +11,8 @@ muzzle {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val versions: Map<String, String> by project
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||||
annotationProcessor("com.google.auto.value:auto-value")
|
annotationProcessor("com.google.auto.value:auto-value")
|
||||||
|
@ -19,25 +21,13 @@ dependencies {
|
||||||
|
|
||||||
library("org.apache.kafka:kafka-clients:0.11.0.0")
|
library("org.apache.kafka:kafka-clients:0.11.0.0")
|
||||||
|
|
||||||
testLibrary("org.springframework.kafka:spring-kafka:1.3.3.RELEASE")
|
testImplementation("org.testcontainers:kafka:${versions["org.testcontainers"]}")
|
||||||
testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
|
|
||||||
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
|
|
||||||
testLibrary("org.assertj:assertj-core")
|
|
||||||
|
|
||||||
// Include latest version of kafka itself along with latest version of client libs.
|
|
||||||
// This seems to help with jar compatibility hell.
|
|
||||||
latestDepTestLibrary("org.apache.kafka:kafka_2.11:2.3.+")
|
|
||||||
// (Pinning to 2.3.x: 2.4.0 introduces an error when executing compileLatestDepTestGroovy)
|
|
||||||
// Caused by: java.lang.NoClassDefFoundError: org.I0Itec.zkclient.ZkClient
|
|
||||||
latestDepTestLibrary("org.apache.kafka:kafka-clients:2.3.+")
|
|
||||||
latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.2.+")
|
|
||||||
latestDepTestLibrary("org.springframework.kafka:spring-kafka-test:2.2.+")
|
|
||||||
// assertj-core:3.20.0 is incompatible with spring-kafka-test:2.7.2
|
|
||||||
latestDepTestLibrary("org.assertj:assertj-core:3.19.0")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks {
|
tasks {
|
||||||
withType<Test>().configureEach {
|
withType<Test>().configureEach {
|
||||||
|
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||||
|
|
||||||
// TODO run tests both with and without experimental span attributes
|
// TODO run tests both with and without experimental span attributes
|
||||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||||
}
|
}
|
||||||
|
@ -59,8 +49,3 @@ tasks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Requires old version of AssertJ for baseline
|
|
||||||
if (!(findProperty("testLatestDeps") as Boolean)) {
|
|
||||||
configurations.testRuntimeClasspath.resolutionStrategy.force("org.assertj:assertj-core:2.9.1")
|
|
||||||
}
|
|
||||||
|
|
|
@ -4,19 +4,23 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.admin.AdminClient
|
||||||
import org.junit.Rule
|
import org.apache.kafka.clients.admin.NewTopic
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
import org.apache.kafka.clients.consumer.Consumer
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
import org.apache.kafka.clients.producer.KafkaProducer
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
import org.apache.kafka.clients.producer.Producer
|
||||||
import org.springframework.kafka.listener.MessageListener
|
import org.apache.kafka.clients.producer.ProducerRecord
|
||||||
import org.springframework.kafka.test.rule.KafkaEmbedded
|
import org.apache.kafka.common.TopicPartition
|
||||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
import org.apache.kafka.common.serialization.IntegerDeserializer
|
||||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
import org.apache.kafka.common.serialization.IntegerSerializer
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
import org.testcontainers.containers.KafkaContainer
|
||||||
|
import spock.lang.Shared
|
||||||
import spock.lang.Unroll
|
import spock.lang.Unroll
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.time.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
||||||
|
@ -26,75 +30,67 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
||||||
private static final boolean propagationEnabled = Boolean.parseBoolean(
|
private static final boolean propagationEnabled = Boolean.parseBoolean(
|
||||||
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
|
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
|
||||||
|
|
||||||
@Rule
|
@Shared
|
||||||
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)
|
static KafkaContainer kafka
|
||||||
|
@Shared
|
||||||
|
static Producer<Integer, String> producer
|
||||||
|
@Shared
|
||||||
|
static Consumer<Integer, String> consumer
|
||||||
|
|
||||||
abstract containerProperties()
|
def setupSpec() {
|
||||||
|
kafka = new KafkaContainer()
|
||||||
|
kafka.start()
|
||||||
|
|
||||||
Map<String, Object> senderProps() {
|
// create test topic
|
||||||
return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin ->
|
||||||
|
admin.createTopics([new NewTopic(SHARED_TOPIC, 1, (short) 1)]).all().get(10, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
// values copied from spring's KafkaTestUtils
|
||||||
|
def producerProps = [
|
||||||
|
"bootstrap.servers": kafka.bootstrapServers,
|
||||||
|
"retries" : 0,
|
||||||
|
"batch.size" : "16384",
|
||||||
|
"linger.ms" : 1,
|
||||||
|
"buffer.memory" : "33554432",
|
||||||
|
"key.serializer" : IntegerSerializer,
|
||||||
|
"value.serializer" : StringSerializer
|
||||||
|
]
|
||||||
|
producer = new KafkaProducer<>(producerProps)
|
||||||
|
|
||||||
|
// values copied from spring's KafkaTestUtils
|
||||||
|
def consumerProps = [
|
||||||
|
"bootstrap.servers" : kafka.bootstrapServers,
|
||||||
|
"group.id" : "test",
|
||||||
|
"enable.auto.commit" : "false",
|
||||||
|
"auto.commit.interval.ms": "10",
|
||||||
|
"session.timeout.ms" : "60000",
|
||||||
|
"key.deserializer" : IntegerDeserializer,
|
||||||
|
"value.deserializer" : StringDeserializer
|
||||||
|
]
|
||||||
|
consumer = new KafkaConsumer<>(consumerProps)
|
||||||
|
|
||||||
|
// assign only existing topic partition
|
||||||
|
consumer.assign([new TopicPartition(SHARED_TOPIC, 0)])
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
def cleanupSpec() {
|
||||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka)
|
consumer?.close()
|
||||||
}
|
producer?.close()
|
||||||
|
kafka.stop()
|
||||||
void waitForAssignment(Object container) {
|
|
||||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
|
||||||
}
|
|
||||||
|
|
||||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
|
||||||
producerFactory.stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Unroll
|
@Unroll
|
||||||
def "test kafka client header propagation manual config"() {
|
def "test kafka client header propagation manual config"() {
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
when:
|
||||||
String message = "Testing without headers"
|
String message = "Testing without headers"
|
||||||
kafkaTemplate.send(SHARED_TOPIC, message)
|
producer.send(new ProducerRecord<>(SHARED_TOPIC, message))
|
||||||
|
|
||||||
then:
|
then:
|
||||||
// check that the message was received
|
// check that the message was received
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||||
|
for (record in records) {
|
||||||
received.headers().iterator().hasNext() == propagationEnabled
|
assert record.headers().iterator().hasNext() == propagationEnabled
|
||||||
|
}
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,16 +4,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
import org.apache.kafka.clients.producer.ProducerRecord
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.time.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||||
|
@ -21,14 +14,9 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||||
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
||||||
|
|
||||||
def "should not read remote context when consuming messages if propagation is disabled"() {
|
def "should not read remote context when consuming messages if propagation is disabled"() {
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
when: "send message"
|
when: "send message"
|
||||||
String message = "Testing without headers"
|
String message = "Testing without headers"
|
||||||
kafkaTemplate.send(SHARED_TOPIC, message)
|
producer.send(new ProducerRecord<>(SHARED_TOPIC, message))
|
||||||
|
|
||||||
then: "producer span is created"
|
then: "producer span is created"
|
||||||
assertTraces(1) {
|
assertTraces(1) {
|
||||||
|
@ -47,14 +35,12 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
when: "read message without context propagation"
|
when: "read message without context propagation"
|
||||||
// create a thread safe queue to store the received message
|
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
for (record in records) {
|
||||||
KafkaMessageListenerContainer<Object, Object> container = startConsumer("consumer-without-propagation", records)
|
runWithSpan("processing") {}
|
||||||
|
}
|
||||||
then: "independent consumer span is created"
|
|
||||||
// check that the message was received
|
|
||||||
records.poll(5, TimeUnit.SECONDS) != null
|
|
||||||
|
|
||||||
|
then:
|
||||||
assertTraces(2) {
|
assertTraces(2) {
|
||||||
trace(0, 1) {
|
trace(0, 1) {
|
||||||
span(0) {
|
span(0) {
|
||||||
|
@ -68,7 +54,7 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace(1, 2) {
|
trace(1, 3) {
|
||||||
span(0) {
|
span(0) {
|
||||||
name SHARED_TOPIC + " receive"
|
name SHARED_TOPIC + " receive"
|
||||||
kind CONSUMER
|
kind CONSUMER
|
||||||
|
@ -92,55 +78,15 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||||
"kafka.offset" 0
|
"kafka.offset" Long
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
"kafka.record.queue_time_ms" { it >= 0 }
|
||||||
}
|
}
|
||||||
|
span(2) {
|
||||||
|
name "processing"
|
||||||
|
childOf span(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
container
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
def containerProperties() {
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,23 +5,10 @@
|
||||||
|
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData
|
import io.opentelemetry.sdk.trace.data.SpanData
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer
|
|
||||||
import org.apache.kafka.clients.producer.Producer
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord
|
import org.apache.kafka.clients.producer.ProducerRecord
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
import org.apache.kafka.common.serialization.StringSerializer
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.time.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||||
|
@ -30,42 +17,8 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||||
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||||
|
|
||||||
def "test kafka produce and consume"() {
|
def "test kafka produce and consume"() {
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
waitForTraces(1) // ensure consistent ordering of traces
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
when:
|
||||||
String greeting = "Hello Spring Kafka Sender!"
|
String greeting = "Hello Kafka!"
|
||||||
runWithSpan("parent") {
|
runWithSpan("parent") {
|
||||||
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
|
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
|
||||||
if (ex == null) {
|
if (ex == null) {
|
||||||
|
@ -78,9 +31,13 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||||
|
|
||||||
then:
|
then:
|
||||||
// check that the message was received
|
// check that the message was received
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||||
received.value() == greeting
|
for (record in records) {
|
||||||
received.key() == null
|
runWithSpan("processing") {
|
||||||
|
assert record.value() == greeting
|
||||||
|
assert record.key() == null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assertTraces(2) {
|
assertTraces(2) {
|
||||||
traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER))
|
traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER))
|
||||||
|
@ -111,7 +68,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||||
|
|
||||||
producerSpan = span(1)
|
producerSpan = span(1)
|
||||||
}
|
}
|
||||||
trace(1, 2) {
|
trace(1, 3) {
|
||||||
span(0) {
|
span(0) {
|
||||||
name SHARED_TOPIC + " receive"
|
name SHARED_TOPIC + " receive"
|
||||||
kind CONSUMER
|
kind CONSUMER
|
||||||
|
@ -135,177 +92,29 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||||
"kafka.offset" 0
|
"kafka.offset" Long
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
"kafka.record.queue_time_ms" { it >= 0 }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
producer.close()
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "test spring kafka template produce and consume"() {
|
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
|
||||||
String greeting = "Hello Spring Kafka Sender!"
|
|
||||||
runWithSpan("parent") {
|
|
||||||
kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({
|
|
||||||
runWithSpan("producer callback") {}
|
|
||||||
}, { ex ->
|
|
||||||
runWithSpan("producer exception: " + ex) {}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
// check that the message was received
|
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
|
||||||
received.value() == greeting
|
|
||||||
received.key() == null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
|
||||||
traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER))
|
|
||||||
|
|
||||||
SpanData producerSpan
|
|
||||||
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
span(2) {
|
||||||
name "producer callback"
|
name "processing"
|
||||||
kind INTERNAL
|
childOf span(1)
|
||||||
childOf span(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
producerSpan = span(1)
|
|
||||||
}
|
|
||||||
trace(1, 2) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerSpan
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def "test pass through tombstone"() {
|
def "test pass through tombstone"() {
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
when:
|
||||||
kafkaTemplate.send(SHARED_TOPIC, null)
|
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))
|
||||||
|
|
||||||
then:
|
then:
|
||||||
// check that the message was received
|
// check that the message was received
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||||
received.value() == null
|
for (record in records) {
|
||||||
received.key() == null
|
assert record.value() == null
|
||||||
|
assert record.key() == null
|
||||||
|
}
|
||||||
|
|
||||||
assertTraces(2) {
|
assertTraces(2) {
|
||||||
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
|
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
|
||||||
|
@ -352,53 +161,34 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
||||||
"kafka.offset" 0
|
"kafka.offset" Long
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
"kafka.record.queue_time_ms" { it >= 0 }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def "test records(TopicPartition) kafka consume"() {
|
def "test records(TopicPartition) kafka consume"() {
|
||||||
setup:
|
setup:
|
||||||
|
def partition = 0
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
when: "send message"
|
||||||
def kafkaPartition = 0
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
|
||||||
def consumer = new KafkaConsumer<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producer = new KafkaProducer(senderProps)
|
|
||||||
|
|
||||||
consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
|
|
||||||
|
|
||||||
when:
|
|
||||||
def greeting = "Hello from MockConsumer!"
|
def greeting = "Hello from MockConsumer!"
|
||||||
producer.send(new ProducerRecord<Integer, String>(SHARED_TOPIC, kafkaPartition, null, greeting))
|
producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))
|
||||||
|
|
||||||
then:
|
then: "wait for PRODUCER span"
|
||||||
waitForTraces(1)
|
waitForTraces(1)
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
def pollResult = KafkaTestUtils.getRecords(consumer)
|
|
||||||
|
|
||||||
def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator()
|
when: "receive messages"
|
||||||
|
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||||
def first = null
|
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
|
||||||
if (recs.hasNext()) {
|
for (record in recordsInPartition) {
|
||||||
first = recs.next()
|
assert record.value() == greeting
|
||||||
|
assert record.key() == null
|
||||||
}
|
}
|
||||||
|
|
||||||
then:
|
then:
|
||||||
recs.hasNext() == false
|
|
||||||
first.value() == greeting
|
|
||||||
first.key() == null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
assertTraces(2) {
|
||||||
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
|
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
|
||||||
|
|
||||||
|
@ -443,55 +233,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||||
"kafka.offset" 0
|
"kafka.offset" Long
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
"kafka.record.queue_time_ms" { it >= 0 }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
|
||||||
consumer.close()
|
|
||||||
producer.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
container
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
def containerProperties() {
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
plugins {
|
|
||||||
id("otel.javaagent-testing")
|
|
||||||
}
|
|
||||||
|
|
||||||
dependencies {
|
|
||||||
library("org.apache.kafka:kafka-clients:2.4.0")
|
|
||||||
|
|
||||||
testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent"))
|
|
||||||
|
|
||||||
testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE")
|
|
||||||
testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE")
|
|
||||||
testLibrary("org.springframework:spring-core:5.2.9.RELEASE")
|
|
||||||
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
|
|
||||||
|
|
||||||
latestDepTestLibrary("org.apache.kafka:kafka_2.13:+")
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks {
|
|
||||||
withType<Test>().configureEach {
|
|
||||||
// TODO run tests both with and without experimental span attributes
|
|
||||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
|
||||||
}
|
|
||||||
|
|
||||||
val testPropagationDisabled by registering(Test::class) {
|
|
||||||
filter {
|
|
||||||
includeTestsMatching("KafkaClientPropagationDisabledTest")
|
|
||||||
isFailOnNoMatchingTests = false
|
|
||||||
}
|
|
||||||
include("**/KafkaClientPropagationDisabledTest.*")
|
|
||||||
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
|
|
||||||
}
|
|
||||||
|
|
||||||
named<Test>("test") {
|
|
||||||
dependsOn(testPropagationDisabled)
|
|
||||||
filter {
|
|
||||||
excludeTestsMatching("KafkaClientPropagationDisabledTest")
|
|
||||||
isFailOnNoMatchingTests = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.junit.Rule
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
|
|
||||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
|
||||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
|
||||||
import spock.lang.Unroll
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
|
|
||||||
|
|
||||||
protected static final SHARED_TOPIC = "shared.topic"
|
|
||||||
|
|
||||||
private static final boolean propagationEnabled = Boolean.parseBoolean(
|
|
||||||
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, SHARED_TOPIC)
|
|
||||||
|
|
||||||
abstract containerProperties()
|
|
||||||
|
|
||||||
Map<String, Object> senderProps() {
|
|
||||||
return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString())
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
|
||||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka())
|
|
||||||
}
|
|
||||||
|
|
||||||
void waitForAssignment(Object container) {
|
|
||||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic())
|
|
||||||
}
|
|
||||||
|
|
||||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
|
||||||
producerFactory.destroy()
|
|
||||||
}
|
|
||||||
|
|
||||||
@Unroll
|
|
||||||
def "test kafka client header propagation manual config"() {
|
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
|
||||||
String message = "Testing without headers"
|
|
||||||
kafkaTemplate.send(SHARED_TOPIC, message)
|
|
||||||
|
|
||||||
then:
|
|
||||||
// check that the message was received
|
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
|
||||||
|
|
||||||
received.headers().iterator().hasNext() == propagationEnabled
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,147 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
|
||||||
|
|
||||||
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
|
|
||||||
|
|
||||||
def "should not read remote context when consuming messages if propagation is disabled"() {
|
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
when: "send message"
|
|
||||||
String message = "Testing without headers"
|
|
||||||
kafkaTemplate.send(SHARED_TOPIC, message)
|
|
||||||
|
|
||||||
then: "producer span is created"
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when: "read message without context propagation"
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
KafkaMessageListenerContainer<Object, Object> container = startConsumer("consumer-without-propagation", records)
|
|
||||||
|
|
||||||
then: "independent consumer span is created"
|
|
||||||
// check that the message was received
|
|
||||||
records.poll(5, TimeUnit.SECONDS) != null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
trace(1, 2) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasNoLinks()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
container
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
def containerProperties() {
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,497 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData
|
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer
|
|
||||||
import org.apache.kafka.clients.producer.Producer
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord
|
|
||||||
import org.apache.kafka.common.TopicPartition
|
|
||||||
import org.apache.kafka.common.serialization.StringSerializer
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
|
||||||
|
|
||||||
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
|
||||||
|
|
||||||
def "test kafka produce and consume"() {
|
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
waitForTraces(1) // ensure consistent ordering of traces
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
|
||||||
String greeting = "Hello Spring Kafka Sender!"
|
|
||||||
runWithSpan("parent") {
|
|
||||||
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
|
|
||||||
if (ex == null) {
|
|
||||||
runWithSpan("producer callback") {}
|
|
||||||
} else {
|
|
||||||
runWithSpan("producer exception: " + ex) {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
// check that the message was received
|
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
|
||||||
received.value() == greeting
|
|
||||||
received.key() == null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
|
||||||
traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER))
|
|
||||||
|
|
||||||
SpanData producerSpan
|
|
||||||
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "producer callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf span(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
producerSpan = span(1)
|
|
||||||
}
|
|
||||||
trace(1, 2) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerSpan
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
producer.close()
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "test spring kafka template produce and consume"() {
|
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
|
||||||
String greeting = "Hello Spring Kafka Sender!"
|
|
||||||
runWithSpan("parent") {
|
|
||||||
kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({
|
|
||||||
runWithSpan("producer callback") {}
|
|
||||||
}, { ex ->
|
|
||||||
runWithSpan("producer exception: " + ex) {}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
// check that the message was received
|
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
|
||||||
received.value() == greeting
|
|
||||||
received.key() == null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
|
||||||
traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER))
|
|
||||||
|
|
||||||
SpanData producerSpan
|
|
||||||
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "producer callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf span(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
producerSpan = span(1)
|
|
||||||
}
|
|
||||||
trace(1, 2) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerSpan
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "test pass through tombstone"() {
|
|
||||||
setup:
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the received message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
|
|
||||||
when:
|
|
||||||
kafkaTemplate.send(SHARED_TOPIC, null)
|
|
||||||
|
|
||||||
then:
|
|
||||||
// check that the message was received
|
|
||||||
def received = records.poll(5, TimeUnit.SECONDS)
|
|
||||||
received.value() == null
|
|
||||||
received.key() == null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
|
||||||
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
|
|
||||||
|
|
||||||
SpanData producerSpan
|
|
||||||
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
producerSpan = span(0)
|
|
||||||
}
|
|
||||||
trace(1, 2) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerSpan
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
container?.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "test records(TopicPartition) kafka consume"() {
|
|
||||||
setup:
|
|
||||||
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
def kafkaPartition = 0
|
|
||||||
def consumerProperties = consumerProps("sender", "false")
|
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
|
||||||
def consumer = new KafkaConsumer<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
def senderProps = senderProps()
|
|
||||||
def producer = new KafkaProducer(senderProps)
|
|
||||||
|
|
||||||
consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
|
|
||||||
|
|
||||||
when:
|
|
||||||
def greeting = "Hello from MockConsumer!"
|
|
||||||
producer.send(new ProducerRecord<Integer, String>(SHARED_TOPIC, kafkaPartition, null, greeting))
|
|
||||||
|
|
||||||
then:
|
|
||||||
waitForTraces(1)
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
def pollResult = KafkaTestUtils.getRecords(consumer)
|
|
||||||
|
|
||||||
def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator()
|
|
||||||
|
|
||||||
def first = null
|
|
||||||
if (recs.hasNext()) {
|
|
||||||
first = recs.next()
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
recs.hasNext() == false
|
|
||||||
first.value() == greeting
|
|
||||||
first.key() == null
|
|
||||||
|
|
||||||
assertTraces(2) {
|
|
||||||
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
|
|
||||||
|
|
||||||
SpanData producerSpan
|
|
||||||
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
producerSpan = span(0)
|
|
||||||
}
|
|
||||||
trace(1, 2) {
|
|
||||||
span(0) {
|
|
||||||
name SHARED_TOPIC + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name SHARED_TOPIC + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerSpan
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
consumer.close()
|
|
||||||
producer.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
|
|
||||||
// set up the Kafka consumer properties
|
|
||||||
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
|
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
|
||||||
|
|
||||||
// create a Kafka consumer factory
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
|
|
||||||
|
|
||||||
// set the topic that needs to be consumed
|
|
||||||
def containerProperties = containerProperties()
|
|
||||||
|
|
||||||
// create a Kafka MessageListenerContainer
|
|
||||||
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
container.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
container.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(container)
|
|
||||||
container
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
def containerProperties() {
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -11,47 +11,26 @@ muzzle {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
testSets {
|
val versions: Map<String, String> by project
|
||||||
create("latestDepTest")
|
|
||||||
}
|
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
|
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
|
||||||
|
|
||||||
compileOnly("org.apache.kafka:kafka-streams:0.11.0.0")
|
library("org.apache.kafka:kafka-streams:0.11.0.0")
|
||||||
|
|
||||||
// Include kafka-clients instrumentation for tests.
|
// Include kafka-clients instrumentation for tests.
|
||||||
testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent"))
|
testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent"))
|
||||||
|
|
||||||
testImplementation("org.apache.kafka:kafka-streams:0.11.0.0")
|
testImplementation("org.testcontainers:kafka:${versions["org.testcontainers"]}")
|
||||||
testImplementation("org.apache.kafka:kafka-clients:0.11.0.0")
|
|
||||||
testImplementation("org.springframework.kafka:spring-kafka:1.3.3.RELEASE")
|
|
||||||
testImplementation("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
|
|
||||||
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
|
|
||||||
testImplementation("org.assertj:assertj-core")
|
|
||||||
|
|
||||||
add("latestDepTestImplementation", "org.apache.kafka:kafka_2.13:2.+")
|
latestDepTestLibrary("org.apache.kafka:kafka-streams:2.+")
|
||||||
add("latestDepTestImplementation", "org.apache.kafka:kafka-clients:2.+")
|
|
||||||
add("latestDepTestImplementation", "org.apache.kafka:kafka-streams:2.+")
|
|
||||||
add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka:+")
|
|
||||||
add("latestDepTestImplementation", "org.springframework.kafka:spring-kafka-test:+")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks {
|
tasks {
|
||||||
withType<Test>().configureEach {
|
test {
|
||||||
|
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||||
|
|
||||||
// TODO run tests both with and without experimental span attributes
|
// TODO run tests both with and without experimental span attributes
|
||||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||||
}
|
}
|
||||||
|
|
||||||
if (findProperty("testLatestDeps") as Boolean) {
|
|
||||||
// latestDepTest is still run
|
|
||||||
named("test") {
|
|
||||||
enabled = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Requires old version of AssertJ for baseline
|
|
||||||
if (!(findProperty("testLatestDeps") as Boolean)) {
|
|
||||||
configurations.testRuntimeClasspath.resolutionStrategy.force("org.assertj:assertj-core:2.9.1")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,276 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.Span
|
|
||||||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
|
|
||||||
import io.opentelemetry.context.Context
|
|
||||||
import io.opentelemetry.context.propagation.TextMapGetter
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData
|
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.apache.kafka.common.serialization.Serdes
|
|
||||||
import org.apache.kafka.streams.KafkaStreams
|
|
||||||
import org.apache.kafka.streams.StreamsConfig
|
|
||||||
import org.apache.kafka.streams.kstream.KStream
|
|
||||||
import org.apache.kafka.streams.kstream.ValueMapper
|
|
||||||
import org.junit.ClassRule
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
|
|
||||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
|
||||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
|
||||||
import spock.lang.Shared
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
|
||||||
|
|
||||||
class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
|
||||||
|
|
||||||
static final STREAM_PENDING = "test.pending"
|
|
||||||
static final STREAM_PROCESSED = "test.processed"
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
@ClassRule
|
|
||||||
EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, STREAM_PENDING, STREAM_PROCESSED)
|
|
||||||
|
|
||||||
Map<String, Object> senderProps() {
|
|
||||||
return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString())
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
|
||||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka())
|
|
||||||
}
|
|
||||||
|
|
||||||
void waitForAssignment(Object container) {
|
|
||||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic())
|
|
||||||
}
|
|
||||||
|
|
||||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
|
||||||
producerFactory.destroy()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "test kafka produce and consume with streams in-between"() {
|
|
||||||
setup:
|
|
||||||
def config = new Properties()
|
|
||||||
def senderProps = senderProps()
|
|
||||||
config.putAll(senderProps)
|
|
||||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
|
|
||||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
|
||||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
|
||||||
|
|
||||||
// CONFIGURE CONSUMER
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps("sender", "false"))
|
|
||||||
|
|
||||||
def containerProperties
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED)
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED)
|
|
||||||
}
|
|
||||||
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the processed message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
consumerContainer.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
Span.current().setAttribute("testing", 123)
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
consumerContainer.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(consumerContainer)
|
|
||||||
|
|
||||||
// CONFIGURE PROCESSOR
|
|
||||||
def builder
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
|
|
||||||
}
|
|
||||||
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
|
|
||||||
def values = textLines
|
|
||||||
.mapValues(new ValueMapper<String, String>() {
|
|
||||||
@Override
|
|
||||||
String apply(String textLine) {
|
|
||||||
Span.current().setAttribute("asdf", "testing")
|
|
||||||
return textLine.toLowerCase()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
KafkaStreams streams
|
|
||||||
try {
|
|
||||||
// Different api for test and latestDepTest.
|
|
||||||
values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED)
|
|
||||||
streams = new KafkaStreams(builder, config)
|
|
||||||
} catch (MissingMethodException e) {
|
|
||||||
def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
|
|
||||||
.with(Serdes.String(), Serdes.String())
|
|
||||||
values.to(STREAM_PROCESSED, producer)
|
|
||||||
streams = new KafkaStreams(builder.build(), config)
|
|
||||||
}
|
|
||||||
streams.start()
|
|
||||||
|
|
||||||
// CONFIGURE PRODUCER
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
when:
|
|
||||||
String greeting = "TESTING TESTING 123!"
|
|
||||||
kafkaTemplate.send(STREAM_PENDING, greeting)
|
|
||||||
|
|
||||||
then:
|
|
||||||
// check that the message was received
|
|
||||||
def received = records.poll(10, TimeUnit.SECONDS)
|
|
||||||
received.value() == greeting.toLowerCase()
|
|
||||||
received.key() == null
|
|
||||||
|
|
||||||
assertTraces(3) {
|
|
||||||
traces.sort(orderByRootSpanName(
|
|
||||||
STREAM_PENDING + " send",
|
|
||||||
STREAM_PENDING + " receive",
|
|
||||||
STREAM_PROCESSED + " receive"))
|
|
||||||
|
|
||||||
SpanData producerPending, producerProcessed
|
|
||||||
|
|
||||||
trace(0, 1) {
|
|
||||||
// kafka-clients PRODUCER
|
|
||||||
span(0) {
|
|
||||||
name STREAM_PENDING + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
producerPending = span(0)
|
|
||||||
}
|
|
||||||
trace(1, 3) {
|
|
||||||
// kafka-clients CONSUMER receive
|
|
||||||
span(0) {
|
|
||||||
name STREAM_PENDING + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// kafka-stream CONSUMER
|
|
||||||
span(1) {
|
|
||||||
name STREAM_PENDING + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerPending
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
"asdf" "testing"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// kafka-clients PRODUCER
|
|
||||||
span(2) {
|
|
||||||
name STREAM_PROCESSED + " send"
|
|
||||||
kind PRODUCER
|
|
||||||
childOf span(1)
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
producerProcessed = span(2)
|
|
||||||
}
|
|
||||||
trace(2, 2) {
|
|
||||||
// kafka-clients CONSUMER receive
|
|
||||||
span(0) {
|
|
||||||
name STREAM_PROCESSED + " receive"
|
|
||||||
kind CONSUMER
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// kafka-clients CONSUMER process
|
|
||||||
span(1) {
|
|
||||||
name STREAM_PROCESSED + " process"
|
|
||||||
kind CONSUMER
|
|
||||||
childOf span(0)
|
|
||||||
hasLink producerProcessed
|
|
||||||
attributes {
|
|
||||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
|
||||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
|
||||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
|
||||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
|
||||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
|
||||||
"kafka.offset" 0
|
|
||||||
"kafka.record.queue_time_ms" { it >= 0 }
|
|
||||||
"testing" 123
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def headers = received.headers()
|
|
||||||
headers.iterator().hasNext()
|
|
||||||
def traceparent = new String(headers.headers("traceparent").iterator().next().value())
|
|
||||||
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
|
|
||||||
@Override
|
|
||||||
Iterable<String> keys(String carrier) {
|
|
||||||
return Collections.singleton("traceparent")
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
String get(String carrier, String key) {
|
|
||||||
if (key == "traceparent") {
|
|
||||||
return traceparent
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
})
|
|
||||||
def spanContext = Span.fromContext(context).getSpanContext()
|
|
||||||
def streamTrace = traces.find { it.size() == 3 }
|
|
||||||
def streamSendSpan = streamTrace[2]
|
|
||||||
spanContext.traceId == streamSendSpan.traceId
|
|
||||||
spanContext.spanId == streamSendSpan.spanId
|
|
||||||
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
stopProducerFactory(producerFactory)
|
|
||||||
streams?.close()
|
|
||||||
consumerContainer?.stop()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -10,24 +10,28 @@ import io.opentelemetry.context.propagation.TextMapGetter
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData
|
import io.opentelemetry.sdk.trace.data.SpanData
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.admin.AdminClient
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic
|
||||||
|
import org.apache.kafka.clients.consumer.Consumer
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer
|
||||||
|
import org.apache.kafka.clients.producer.Producer
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord
|
||||||
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
import org.apache.kafka.common.header.Headers
|
||||||
|
import org.apache.kafka.common.serialization.IntegerDeserializer
|
||||||
|
import org.apache.kafka.common.serialization.IntegerSerializer
|
||||||
import org.apache.kafka.common.serialization.Serdes
|
import org.apache.kafka.common.serialization.Serdes
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer
|
||||||
import org.apache.kafka.streams.KafkaStreams
|
import org.apache.kafka.streams.KafkaStreams
|
||||||
import org.apache.kafka.streams.StreamsConfig
|
import org.apache.kafka.streams.StreamsConfig
|
||||||
import org.apache.kafka.streams.kstream.KStream
|
import org.apache.kafka.streams.kstream.KStream
|
||||||
import org.apache.kafka.streams.kstream.ValueMapper
|
import org.apache.kafka.streams.kstream.ValueMapper
|
||||||
import org.junit.ClassRule
|
import org.testcontainers.containers.KafkaContainer
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
|
||||||
import org.springframework.kafka.listener.MessageListener
|
|
||||||
import org.springframework.kafka.test.rule.KafkaEmbedded
|
|
||||||
import org.springframework.kafka.test.utils.ContainerTestUtils
|
|
||||||
import org.springframework.kafka.test.utils.KafkaTestUtils
|
|
||||||
import spock.lang.Shared
|
import spock.lang.Shared
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.time.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||||
|
@ -39,64 +43,59 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
||||||
static final STREAM_PROCESSED = "test.processed"
|
static final STREAM_PROCESSED = "test.processed"
|
||||||
|
|
||||||
@Shared
|
@Shared
|
||||||
@ClassRule
|
static KafkaContainer kafka
|
||||||
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STREAM_PENDING, STREAM_PROCESSED)
|
@Shared
|
||||||
|
static Producer<Integer, String> producer
|
||||||
|
@Shared
|
||||||
|
static Consumer<Integer, String> consumer
|
||||||
|
|
||||||
Map<String, Object> senderProps() {
|
|
||||||
return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
|
def setupSpec() {
|
||||||
|
kafka = new KafkaContainer()
|
||||||
|
kafka.start()
|
||||||
|
|
||||||
|
// create test topic
|
||||||
|
AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin ->
|
||||||
|
admin.createTopics([
|
||||||
|
new NewTopic(STREAM_PENDING, 1, (short) 1),
|
||||||
|
new NewTopic(STREAM_PROCESSED, 1, (short) 1),
|
||||||
|
]).all().get(10, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers))
|
||||||
|
|
||||||
|
// values copied from spring's KafkaTestUtils
|
||||||
|
def consumerProps = [
|
||||||
|
"bootstrap.servers" : kafka.bootstrapServers,
|
||||||
|
"group.id" : "test",
|
||||||
|
"enable.auto.commit" : "false",
|
||||||
|
"auto.commit.interval.ms": "10",
|
||||||
|
"session.timeout.ms" : "60000",
|
||||||
|
"key.deserializer" : IntegerDeserializer,
|
||||||
|
"value.deserializer" : StringDeserializer
|
||||||
|
]
|
||||||
|
consumer = new KafkaConsumer<>(consumerProps)
|
||||||
|
|
||||||
|
// assign topic partitions
|
||||||
|
consumer.assign([
|
||||||
|
new TopicPartition(STREAM_PROCESSED, 0)
|
||||||
|
])
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Object> consumerProps(String group, String autoCommit) {
|
def cleanupSpec() {
|
||||||
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka)
|
consumer?.close()
|
||||||
}
|
producer?.close()
|
||||||
|
kafka.stop()
|
||||||
void waitForAssignment(Object container) {
|
|
||||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
|
|
||||||
}
|
|
||||||
|
|
||||||
def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
|
|
||||||
producerFactory.stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def "test kafka produce and consume with streams in-between"() {
|
def "test kafka produce and consume with streams in-between"() {
|
||||||
setup:
|
setup:
|
||||||
def config = new Properties()
|
def config = new Properties()
|
||||||
def senderProps = senderProps()
|
config.putAll(producerProps(kafka.bootstrapServers))
|
||||||
config.putAll(senderProps)
|
|
||||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
|
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
|
||||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName())
|
||||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
||||||
|
|
||||||
// CONFIGURE CONSUMER
|
|
||||||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps("sender", "false"))
|
|
||||||
|
|
||||||
def containerProperties
|
|
||||||
try {
|
|
||||||
// Different class names for test and latestDepTest.
|
|
||||||
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED)
|
|
||||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
|
||||||
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED)
|
|
||||||
}
|
|
||||||
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
|
|
||||||
|
|
||||||
// create a thread safe queue to store the processed message
|
|
||||||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
|
|
||||||
|
|
||||||
// setup a Kafka message listener
|
|
||||||
consumerContainer.setupMessageListener(new MessageListener<String, String>() {
|
|
||||||
@Override
|
|
||||||
void onMessage(ConsumerRecord<String, String> record) {
|
|
||||||
Span.current().setAttribute("testing", 123)
|
|
||||||
records.add(record)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// start the container and underlying message listener
|
|
||||||
consumerContainer.start()
|
|
||||||
|
|
||||||
// wait until the container has the required number of assigned partitions
|
|
||||||
waitForAssignment(consumerContainer)
|
|
||||||
|
|
||||||
// CONFIGURE PROCESSOR
|
// CONFIGURE PROCESSOR
|
||||||
def builder
|
def builder
|
||||||
try {
|
try {
|
||||||
|
@ -128,19 +127,24 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
||||||
}
|
}
|
||||||
streams.start()
|
streams.start()
|
||||||
|
|
||||||
// CONFIGURE PRODUCER
|
|
||||||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
|
|
||||||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
|
|
||||||
|
|
||||||
when:
|
when:
|
||||||
String greeting = "TESTING TESTING 123!"
|
String greeting = "TESTING TESTING 123!"
|
||||||
kafkaTemplate.send(STREAM_PENDING, greeting)
|
producer.send(new ProducerRecord<>(STREAM_PENDING, greeting))
|
||||||
|
|
||||||
then:
|
then:
|
||||||
// check that the message was received
|
// check that the message was received
|
||||||
def received = records.poll(10, TimeUnit.SECONDS)
|
def records = consumer.poll(Duration.ofSeconds(10).toMillis())
|
||||||
received.value() == greeting.toLowerCase()
|
Headers receivedHeaders = null
|
||||||
received.key() == null
|
for (record in records) {
|
||||||
|
Span.current().setAttribute("testing", 123)
|
||||||
|
|
||||||
|
assert record.value() == greeting.toLowerCase()
|
||||||
|
assert record.key() == null
|
||||||
|
|
||||||
|
if (receivedHeaders == null) {
|
||||||
|
receivedHeaders = record.headers()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assertTraces(3) {
|
assertTraces(3) {
|
||||||
traces.sort(orderByRootSpanName(
|
traces.sort(orderByRootSpanName(
|
||||||
|
@ -244,9 +248,8 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def headers = received.headers()
|
receivedHeaders.iterator().hasNext()
|
||||||
headers.iterator().hasNext()
|
def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value())
|
||||||
def traceparent = new String(headers.headers("traceparent").iterator().next().value())
|
|
||||||
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
|
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
|
||||||
@Override
|
@Override
|
||||||
Iterable<String> keys(String carrier) {
|
Iterable<String> keys(String carrier) {
|
||||||
|
@ -266,11 +269,18 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
||||||
def streamSendSpan = streamTrace[2]
|
def streamSendSpan = streamTrace[2]
|
||||||
spanContext.traceId == streamSendSpan.traceId
|
spanContext.traceId == streamSendSpan.traceId
|
||||||
spanContext.spanId == streamSendSpan.spanId
|
spanContext.spanId == streamSendSpan.spanId
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> producerProps(String servers) {
|
||||||
cleanup:
|
// values copied from spring's KafkaTestUtils
|
||||||
stopProducerFactory(producerFactory)
|
return [
|
||||||
streams?.close()
|
"bootstrap.servers": servers,
|
||||||
consumerContainer?.stop()
|
"retries" : 0,
|
||||||
|
"batch.size" : "16384",
|
||||||
|
"linger.ms" : 1,
|
||||||
|
"buffer.memory" : "33554432",
|
||||||
|
"key.serializer" : IntegerSerializer,
|
||||||
|
"value.serializer" : StringSerializer
|
||||||
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,7 +211,6 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent")
|
||||||
include(":instrumentation:jsf:myfaces-1.2:javaagent")
|
include(":instrumentation:jsf:myfaces-1.2:javaagent")
|
||||||
include(":instrumentation:jsp-2.3:javaagent")
|
include(":instrumentation:jsp-2.3:javaagent")
|
||||||
include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")
|
include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")
|
||||||
include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing")
|
|
||||||
include(":instrumentation:kafka-clients:kafka-clients-common:javaagent")
|
include(":instrumentation:kafka-clients:kafka-clients-common:javaagent")
|
||||||
include(":instrumentation:kafka-streams-0.11:javaagent")
|
include(":instrumentation:kafka-streams-0.11:javaagent")
|
||||||
include(":instrumentation:kotlinx-coroutines:javaagent")
|
include(":instrumentation:kotlinx-coroutines:javaagent")
|
||||||
|
|
Loading…
Reference in New Issue