Kafka context propagation is configurable for consumer as well. (#656)

* Kafka context propagation is configurable for consumer as well.

`ota.kafka.client.propagation.enabled` configuration property now governs both injecting context on the producer side and reading context on the consumer side.

* Rename config property

* Revert "Rename config property"

This reverts commit 450ea8ffc7.
This commit is contained in:
Nikita Salnikov-Tarnovski 2020-07-10 21:46:55 +03:00 committed by GitHub
parent ce14b394f7
commit 8fa37618a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 158 additions and 6 deletions

View File

@ -22,6 +22,7 @@ import static io.opentelemetry.auto.instrumentation.kafkaclients.TextMapExtractA
import static io.opentelemetry.trace.Span.Kind.CONSUMER;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;
import io.opentelemetry.auto.config.Config;
import io.opentelemetry.auto.instrumentation.api.SpanWithScope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
@ -75,6 +76,7 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
if (consumer) {
spanBuilder.setSpanKind(CONSUMER);
}
if (Config.get().isKafkaClientPropagationEnabled()) {
final SpanContext spanContext = extract(next.headers(), GETTER);
if (spanContext.isValid()) {
if (consumer) {
@ -83,6 +85,7 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
spanBuilder.addLink(spanContext);
}
}
}
final long startTimeMillis = System.currentTimeMillis();
spanBuilder.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis));
final Span span = spanBuilder.startSpan();

View File

@ -16,6 +16,7 @@
import io.opentelemetry.auto.config.Config
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.ConfigUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
@ -333,6 +334,154 @@ class KafkaClientTest extends AgentTestRunner {
}
def "should not read remote context when consuming messages if propagation is disabled"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
when: "send message"
String message = "Testing without headers"
withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, "true") {
kafkaTemplate.send(SHARED_TOPIC, message)
}
then: "producer span is created"
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName SHARED_TOPIC
spanKind PRODUCER
errored false
parent()
attributes {
}
}
}
}
when: "read message with context propagation"
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
KafkaMessageListenerContainer<Object, Object> container = startConsumer("consumer-with-propagation", records)
then: "consumer span is created and is a child of the producer"
// check that the message was received
records.poll(5, TimeUnit.SECONDS) != null
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName SHARED_TOPIC
spanKind PRODUCER
errored false
parent()
attributes {
}
}
span(1) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
childOf span(0)
attributes {
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
}
container.stop()
when: "read message without context propagation"
ConfigUtils.updateConfig {
System.setProperty("ota."+Config.KAFKA_CLIENT_PROPAGATION_ENABLED, "false")
}
records.clear()
container = startConsumer("consumer-without-propagation", records)
then: "independent consumer span is created"
// check that the message was received
records.poll(5, TimeUnit.SECONDS) != null
assertTraces(2) {
trace(0, 2) {
span(0) {
operationName SHARED_TOPIC
spanKind PRODUCER
errored false
parent()
attributes {
}
}
span(1) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
childOf span(0)
attributes {
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
trace(1, 1) {
span(0) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
parent()
attributes {
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
}
cleanup:
producerFactory.stop()
container?.stop()
ConfigUtils.updateConfig {
System.clearProperty("ota."+Config.KAFKA_CLIENT_PROPAGATION_ENABLED)
}
}
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
container
}
def containerProperties() {
try {