Experimental option to suppress messaging receive spans (#4187)
* Experimental option to suppress messaging receive spans * Kafka streams too * Better conditionals * Remove oops * Extract base class for kafka streams tests * Spotless
This commit is contained in:
parent
0f3d0cb05b
commit
3ce940548c
|
@ -28,4 +28,9 @@ public final class ExperimentalConfig {
|
|||
public boolean suppressViewSpans() {
|
||||
return config.getBoolean("otel.instrumentation.common.experimental.suppress-view-spans", false);
|
||||
}
|
||||
|
||||
public boolean suppressMessagingReceiveSpans() {
|
||||
return config.getBoolean(
|
||||
"otel.instrumentation.common.experimental.suppress-messaging-receive-spans", false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,21 @@ tasks {
|
|||
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
|
||||
}
|
||||
|
||||
named<Test>("test") {
|
||||
val testReceiveSpansDisabled by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("KafkaClientSuppressReceiveSpansTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/KafkaClientSuppressReceiveSpansTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true")
|
||||
}
|
||||
|
||||
test {
|
||||
dependsOn(testPropagationDisabled)
|
||||
dependsOn(testReceiveSpansDisabled)
|
||||
filter {
|
||||
excludeTestsMatching("KafkaClientPropagationDisabledTest")
|
||||
excludeTestsMatching("KafkaClientSuppressReceiveSpansTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
|
@ -52,6 +53,7 @@ public final class KafkaSingletons {
|
|||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
|
||||
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
|
||||
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
}
|
||||
|
||||
|
@ -69,12 +71,17 @@ public final class KafkaSingletons {
|
|||
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
|
||||
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
|
||||
}
|
||||
if (KafkaPropagation.isPropagationEnabled()) {
|
||||
|
||||
if (!KafkaPropagation.isPropagationEnabled()) {
|
||||
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
|
||||
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
|
||||
} else {
|
||||
builder.addSpanLinksExtractor(
|
||||
SpanLinksExtractor.fromUpstreamRequest(
|
||||
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
|
||||
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
}
|
||||
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
}
|
||||
|
||||
public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
|
||||
|
|
|
@ -15,7 +15,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
|||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
|
||||
class KafkaClientDefaultTest extends KafkaClientBaseTest {
|
||||
|
||||
def "test kafka produce and consume"() {
|
||||
when:
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
|
||||
|
||||
def "test kafka produce and consume"() {
|
||||
when:
|
||||
String greeting = "Hello Kafka!"
|
||||
runWithSpan("parent") {
|
||||
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
|
||||
if (ex == null) {
|
||||
runWithSpan("producer callback") {}
|
||||
} else {
|
||||
runWithSpan("producer exception: " + ex) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||
for (record in records) {
|
||||
runWithSpan("processing") {
|
||||
assert record.value() == greeting
|
||||
assert record.key() == null
|
||||
}
|
||||
}
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 5) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" Long
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "processing"
|
||||
childOf span(2)
|
||||
}
|
||||
span(4) {
|
||||
name "producer callback"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test pass through tombstone"() {
|
||||
when:
|
||||
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||
for (record in records) {
|
||||
assert record.value() == null
|
||||
assert record.key() == null
|
||||
}
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
|
||||
"kafka.offset" Long
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test records(TopicPartition) kafka consume"() {
|
||||
setup:
|
||||
def partition = 0
|
||||
|
||||
when: "send message"
|
||||
def greeting = "Hello from MockConsumer!"
|
||||
producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))
|
||||
|
||||
then: "wait for PRODUCER span"
|
||||
waitForTraces(1)
|
||||
|
||||
when: "receive messages"
|
||||
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
|
||||
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
|
||||
for (record in recordsInPartition) {
|
||||
assert record.value() == greeting
|
||||
assert record.key() == null
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name SHARED_TOPIC + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name SHARED_TOPIC + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" Long
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,10 +25,27 @@ dependencies {
|
|||
}
|
||||
|
||||
tasks {
|
||||
test {
|
||||
withType<Test>().configureEach {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||
|
||||
// TODO run tests both with and without experimental span attributes
|
||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||
}
|
||||
|
||||
val testReceiveSpansDisabled by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("KafkaStreamsSuppressReceiveSpansTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/KafkaStreamsSuppressReceiveSpansTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true")
|
||||
}
|
||||
|
||||
test {
|
||||
dependsOn(testReceiveSpansDisabled)
|
||||
filter {
|
||||
excludeTestsMatching("KafkaStreamsSuppressReceiveSpansTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
|
@ -40,12 +41,17 @@ public final class KafkaStreamsSingletons {
|
|||
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
|
||||
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
|
||||
}
|
||||
if (KafkaPropagation.isPropagationEnabled()) {
|
||||
|
||||
if (!KafkaPropagation.isPropagationEnabled()) {
|
||||
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
|
||||
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
|
||||
} else {
|
||||
builder.addSpanLinksExtractor(
|
||||
SpanLinksExtractor.fromUpstreamRequest(
|
||||
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
|
||||
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
}
|
||||
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
}
|
||||
|
||||
public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import org.apache.kafka.clients.admin.AdminClient
|
||||
import org.apache.kafka.clients.admin.NewTopic
|
||||
import org.apache.kafka.clients.consumer.Consumer
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.clients.producer.Producer
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.testcontainers.containers.KafkaContainer
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class KafkaStreamsBaseTest extends AgentInstrumentationSpecification {
|
||||
|
||||
protected static final STREAM_PENDING = "test.pending"
|
||||
protected static final STREAM_PROCESSED = "test.processed"
|
||||
|
||||
@Shared
|
||||
static KafkaContainer kafka
|
||||
@Shared
|
||||
static Producer<Integer, String> producer
|
||||
@Shared
|
||||
static Consumer<Integer, String> consumer
|
||||
@Shared
|
||||
static CountDownLatch consumerReady = new CountDownLatch(1)
|
||||
|
||||
def setupSpec() {
|
||||
kafka = new KafkaContainer()
|
||||
kafka.start()
|
||||
|
||||
// create test topic
|
||||
AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin ->
|
||||
admin.createTopics([
|
||||
new NewTopic(STREAM_PENDING, 1, (short) 1),
|
||||
new NewTopic(STREAM_PROCESSED, 1, (short) 1),
|
||||
]).all().get(10, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers))
|
||||
|
||||
def consumerProps = [
|
||||
"bootstrap.servers" : kafka.bootstrapServers,
|
||||
"group.id" : "test",
|
||||
"enable.auto.commit" : "true",
|
||||
"auto.commit.interval.ms": "10",
|
||||
"session.timeout.ms" : "30000",
|
||||
"key.deserializer" : IntegerDeserializer,
|
||||
"value.deserializer" : StringDeserializer
|
||||
]
|
||||
consumer = new KafkaConsumer<>(consumerProps)
|
||||
|
||||
consumer.subscribe([STREAM_PROCESSED], new ConsumerRebalanceListener() {
|
||||
@Override
|
||||
void onPartitionsRevoked(Collection<TopicPartition> collection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
void onPartitionsAssigned(Collection<TopicPartition> collection) {
|
||||
consumerReady.countDown()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
consumer?.close()
|
||||
producer?.close()
|
||||
kafka.stop()
|
||||
}
|
||||
|
||||
static Map<String, Object> producerProps(String servers) {
|
||||
// values copied from spring's KafkaTestUtils
|
||||
return [
|
||||
"bootstrap.servers": servers,
|
||||
"retries" : 0,
|
||||
"batch.size" : "16384",
|
||||
"linger.ms" : 1,
|
||||
"buffer.memory" : "33554432",
|
||||
"key.serializer" : IntegerSerializer,
|
||||
"value.serializer" : StringSerializer
|
||||
]
|
||||
}
|
||||
|
||||
// Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition
|
||||
static void awaitUntilConsumerIsReady() {
|
||||
if (consumerReady.await(0, TimeUnit.SECONDS)) {
|
||||
return
|
||||
}
|
||||
for (i in 0..<10) {
|
||||
consumer.poll(0)
|
||||
if (consumerReady.await(1, TimeUnit.SECONDS)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if (consumerReady.getCount() != 0) {
|
||||
throw new AssertionError("Consumer wasn't assigned any partitions!")
|
||||
}
|
||||
consumer.seekToBeginning([])
|
||||
}
|
||||
}
|
|
@ -7,94 +7,22 @@ import io.opentelemetry.api.trace.Span
|
|||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.context.propagation.TextMapGetter
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import io.opentelemetry.sdk.trace.data.SpanData
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import org.apache.kafka.clients.admin.AdminClient
|
||||
import org.apache.kafka.clients.admin.NewTopic
|
||||
import org.apache.kafka.clients.consumer.Consumer
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.clients.producer.Producer
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.header.Headers
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer
|
||||
import org.apache.kafka.common.serialization.Serdes
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.apache.kafka.streams.KafkaStreams
|
||||
import org.apache.kafka.streams.StreamsConfig
|
||||
import org.apache.kafka.streams.kstream.KStream
|
||||
import org.apache.kafka.streams.kstream.ValueMapper
|
||||
import org.testcontainers.containers.KafkaContainer
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
||||
|
||||
static final STREAM_PENDING = "test.pending"
|
||||
static final STREAM_PROCESSED = "test.processed"
|
||||
|
||||
@Shared
|
||||
static KafkaContainer kafka
|
||||
@Shared
|
||||
static Producer<Integer, String> producer
|
||||
@Shared
|
||||
static Consumer<Integer, String> consumer
|
||||
@Shared
|
||||
static CountDownLatch consumerReady = new CountDownLatch(1)
|
||||
|
||||
def setupSpec() {
|
||||
kafka = new KafkaContainer()
|
||||
kafka.start()
|
||||
|
||||
// create test topic
|
||||
AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin ->
|
||||
admin.createTopics([
|
||||
new NewTopic(STREAM_PENDING, 1, (short) 1),
|
||||
new NewTopic(STREAM_PROCESSED, 1, (short) 1),
|
||||
]).all().get(10, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers))
|
||||
|
||||
def consumerProps = [
|
||||
"bootstrap.servers" : kafka.bootstrapServers,
|
||||
"group.id" : "test",
|
||||
"enable.auto.commit" : "true",
|
||||
"auto.commit.interval.ms": "10",
|
||||
"session.timeout.ms" : "30000",
|
||||
"key.deserializer" : IntegerDeserializer,
|
||||
"value.deserializer" : StringDeserializer
|
||||
]
|
||||
consumer = new KafkaConsumer<>(consumerProps)
|
||||
|
||||
consumer.subscribe([STREAM_PROCESSED], new ConsumerRebalanceListener() {
|
||||
@Override
|
||||
void onPartitionsRevoked(Collection<TopicPartition> collection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
void onPartitionsAssigned(Collection<TopicPartition> collection) {
|
||||
consumerReady.countDown()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
consumer?.close()
|
||||
producer?.close()
|
||||
kafka.stop()
|
||||
}
|
||||
class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
|
||||
|
||||
def "test kafka produce and consume with streams in-between"() {
|
||||
setup:
|
||||
|
@ -278,34 +206,4 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
|
|||
spanContext.traceId == streamSendSpan.traceId
|
||||
spanContext.spanId == streamSendSpan.spanId
|
||||
}
|
||||
|
||||
private static Map<String, Object> producerProps(String servers) {
|
||||
// values copied from spring's KafkaTestUtils
|
||||
return [
|
||||
"bootstrap.servers": servers,
|
||||
"retries" : 0,
|
||||
"batch.size" : "16384",
|
||||
"linger.ms" : 1,
|
||||
"buffer.memory" : "33554432",
|
||||
"key.serializer" : IntegerSerializer,
|
||||
"value.serializer" : StringSerializer
|
||||
]
|
||||
}
|
||||
|
||||
// Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition
|
||||
static void awaitUntilConsumerIsReady() {
|
||||
if (consumerReady.await(0, TimeUnit.SECONDS)) {
|
||||
return
|
||||
}
|
||||
for (i in 0..<10) {
|
||||
consumer.poll(0)
|
||||
if (consumerReady.await(1, TimeUnit.SECONDS)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if (consumerReady.getCount() != 0) {
|
||||
throw new AssertionError("Consumer wasn't assigned any partitions!")
|
||||
}
|
||||
consumer.seekToBeginning([])
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.context.propagation.TextMapGetter
|
||||
import io.opentelemetry.sdk.trace.data.SpanData
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.header.Headers
|
||||
import org.apache.kafka.common.serialization.Serdes
|
||||
import org.apache.kafka.streams.KafkaStreams
|
||||
import org.apache.kafka.streams.StreamsConfig
|
||||
import org.apache.kafka.streams.kstream.KStream
|
||||
import org.apache.kafka.streams.kstream.ValueMapper
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
|
||||
|
||||
def "test kafka produce and consume with streams in-between"() {
|
||||
setup:
|
||||
def config = new Properties()
|
||||
config.putAll(producerProps(kafka.bootstrapServers))
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName())
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
|
||||
|
||||
// CONFIGURE PROCESSOR
|
||||
def builder
|
||||
try {
|
||||
// Different class names for test and latestDepTest.
|
||||
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
|
||||
}
|
||||
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
|
||||
def values = textLines
|
||||
.mapValues(new ValueMapper<String, String>() {
|
||||
@Override
|
||||
String apply(String textLine) {
|
||||
Span.current().setAttribute("asdf", "testing")
|
||||
return textLine.toLowerCase()
|
||||
}
|
||||
})
|
||||
|
||||
KafkaStreams streams
|
||||
try {
|
||||
// Different api for test and latestDepTest.
|
||||
values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED)
|
||||
streams = new KafkaStreams(builder, config)
|
||||
} catch (MissingMethodException e) {
|
||||
def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
|
||||
.with(Serdes.String(), Serdes.String())
|
||||
values.to(STREAM_PROCESSED, producer)
|
||||
streams = new KafkaStreams(builder.build(), config)
|
||||
}
|
||||
streams.start()
|
||||
|
||||
when:
|
||||
String greeting = "TESTING TESTING 123!"
|
||||
producer.send(new ProducerRecord<>(STREAM_PENDING, greeting))
|
||||
|
||||
then:
|
||||
// check that the message was received
|
||||
def records = consumer.poll(Duration.ofSeconds(10).toMillis())
|
||||
Headers receivedHeaders = null
|
||||
for (record in records) {
|
||||
Span.current().setAttribute("testing", 123)
|
||||
|
||||
assert record.value() == greeting.toLowerCase()
|
||||
assert record.key() == null
|
||||
|
||||
if (receivedHeaders == null) {
|
||||
receivedHeaders = record.headers()
|
||||
}
|
||||
}
|
||||
|
||||
SpanData streamSendSpan
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
// kafka-clients PRODUCER
|
||||
span(0) {
|
||||
name STREAM_PENDING + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
// kafka-stream CONSUMER
|
||||
span(1) {
|
||||
name STREAM_PENDING + " process"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
"asdf" "testing"
|
||||
}
|
||||
}
|
||||
|
||||
streamSendSpan = span(2)
|
||||
|
||||
// kafka-clients PRODUCER
|
||||
span(2) {
|
||||
name STREAM_PROCESSED + " send"
|
||||
kind PRODUCER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
}
|
||||
}
|
||||
// kafka-clients CONSUMER process
|
||||
span(3) {
|
||||
name STREAM_PROCESSED + " process"
|
||||
kind CONSUMER
|
||||
childOf span(2)
|
||||
attributes {
|
||||
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
|
||||
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
|
||||
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
|
||||
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
|
||||
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
|
||||
"kafka.offset" 0
|
||||
"kafka.record.queue_time_ms" { it >= 0 }
|
||||
"testing" 123
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
receivedHeaders.iterator().hasNext()
|
||||
def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value())
|
||||
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
|
||||
@Override
|
||||
Iterable<String> keys(String carrier) {
|
||||
return Collections.singleton("traceparent")
|
||||
}
|
||||
|
||||
@Override
|
||||
String get(String carrier, String key) {
|
||||
if (key == "traceparent") {
|
||||
return traceparent
|
||||
}
|
||||
return null
|
||||
}
|
||||
})
|
||||
def spanContext = Span.fromContext(context).getSpanContext()
|
||||
spanContext.traceId == streamSendSpan.traceId
|
||||
spanContext.spanId == streamSendSpan.spanId
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue