Add support for Kafka consumer and producer interceptors. (#4065)

* Add support for Kafka consumer and producer interceptors, move common Kafka code to library module.

* Apply feedback

* Apply feedback, #3.

* Apply feedback, #4.

* Add producer / consumer wrappers.

* Move to kafka-clients-2.6.

* Apply feedback #5.

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Aleš Justin 2021-10-02 20:27:57 +02:00 committed by GitHub
parent 033c20a3d9
commit ff0bf0a8f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1204 additions and 211 deletions

View File

@ -15,11 +15,12 @@ dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.apache.kafka:kafka-clients:0.11.0.0")
testImplementation("org.testcontainers:kafka")
testImplementation(project(":instrumentation:kafka-clients:kafka-clients-0.11:testing"))
}
tasks {

View File

@ -17,6 +17,8 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import io.opentelemetry.instrumentation.kafka.Timer;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;

View File

@ -13,10 +13,10 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

View File

@ -5,20 +5,9 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -26,63 +15,11 @@ public final class KafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";
private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
buildProducerInstrumenter();
KafkaInstrumenterBuilder.buildProducerInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
buildConsumerReceiveInstrumenter();
KafkaInstrumenterBuilder.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
buildConsumerProcessInstrumenter();
private static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter() {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
SpanNameExtractor<ProducerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
return Instrumenter.<ProducerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor())
.newInstrumenter(SpanKindExtractor.alwaysProducer());
}
private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter() {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
return Instrumenter.<ReceivedRecords, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor());
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
}
KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;

View File

@ -6,7 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIterableWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;

View File

@ -11,7 +11,7 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;

View File

@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util.concurrent.TimeUnit
@ -15,7 +16,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientDefaultTest extends KafkaClientBaseTest {
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
def "test kafka produce and consume"() {
when:
@ -190,7 +191,7 @@ class KafkaClientDefaultTest extends KafkaClientBaseTest {
when: "receive messages"
awaitUntilConsumerIsReady()
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
def recordsInPartition = consumerRecords.records(topicPartition)
recordsInPartition.size() == 1
// iterate over records to generate spans

View File

@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
@ -11,7 +13,7 @@ import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest {
def "should not read remote context when consuming messages if propagation is disabled"() {
when: "send message"

View File

@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@ -13,7 +15,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest {
def "test kafka produce and consume"() {
when:
@ -29,6 +31,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
}
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
@ -88,6 +91,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
@ -138,6 +142,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
then: "wait for PRODUCER span"
waitForTraces(1)
awaitUntilConsumerIsReady()
when: "receive messages"
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())

View File

@ -0,0 +1,13 @@
plugins {
id("otel.java-conventions")
}
dependencies {
api(project(":testing-common"))
implementation("org.apache.kafka:kafka-clients:0.11.0.0")
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
implementation("org.testcontainers:kafka")
}

View File

@ -3,7 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.Consumer
@ -11,7 +13,6 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.IntegerSerializer
@ -19,19 +20,14 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.testcontainers.containers.KafkaContainer
import spock.lang.Shared
import spock.lang.Unroll
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
abstract class KafkaClientBaseTest extends InstrumentationSpecification {
protected static final SHARED_TOPIC = "shared.topic"
private static final boolean propagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
@Shared
static KafkaContainer kafka
@Shared
@ -41,6 +37,8 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
@Shared
static CountDownLatch consumerReady = new CountDownLatch(1)
static TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, 0)
def setupSpec() {
kafka = new KafkaContainer()
kafka.start()
@ -50,28 +48,9 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
admin.createTopics([new NewTopic(SHARED_TOPIC, 1, (short) 1)]).all().get(10, TimeUnit.SECONDS)
}
// values copied from spring's KafkaTestUtils
def producerProps = [
"bootstrap.servers": kafka.bootstrapServers,
"retries" : 0,
"batch.size" : "16384",
"linger.ms" : 1,
"buffer.memory" : "33554432",
"key.serializer" : IntegerSerializer,
"value.serializer" : StringSerializer
]
producer = new KafkaProducer<>(producerProps)
producer = new KafkaProducer<>(producerProps())
def consumerProps = [
"bootstrap.servers" : kafka.bootstrapServers,
"group.id" : "test",
"enable.auto.commit" : "true",
"auto.commit.interval.ms": "10",
"session.timeout.ms" : "30000",
"key.deserializer" : IntegerDeserializer,
"value.deserializer" : StringDeserializer
]
consumer = new KafkaConsumer<>(consumerProps)
consumer = new KafkaConsumer<>(consumerProps())
consumer.subscribe([SHARED_TOPIC], new ConsumerRebalanceListener() {
@Override
@ -85,29 +64,38 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
})
}
Map<String, ?> producerProps() {
// values copied from spring's KafkaTestUtils
return [
"bootstrap.servers": kafka.bootstrapServers,
"retries" : 0,
"batch.size" : "16384",
"linger.ms" : 1,
"buffer.memory" : "33554432",
"key.serializer" : IntegerSerializer,
"value.serializer" : StringSerializer
]
}
Map<String, ?> consumerProps() {
// values copied from spring's KafkaTestUtils
return [
"bootstrap.servers" : kafka.bootstrapServers,
"group.id" : "test",
"enable.auto.commit" : "true",
"auto.commit.interval.ms": "10",
"session.timeout.ms" : "30000",
"key.deserializer" : IntegerDeserializer,
"value.deserializer" : StringDeserializer
]
}
def cleanupSpec() {
consumer?.close()
producer?.close()
kafka.stop()
}
@Unroll
def "test kafka client header propagation manual config"() {
when:
String message = "Testing without headers"
producer.send(new ProducerRecord<>(SHARED_TOPIC, message))
.get(5, TimeUnit.SECONDS)
then:
awaitUntilConsumerIsReady()
def records = consumer.poll(Duration.ofSeconds(1).toMillis())
records.count() == 1
for (record in records) {
assert record.headers().iterator().hasNext() == propagationEnabled
}
}
// Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition
static void awaitUntilConsumerIsReady() {
if (consumerReady.await(0, TimeUnit.SECONDS)) {

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.AgentTestTrait
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
import java.time.Duration
abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implements AgentTestTrait {
private static final boolean propagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
@Unroll
def "test kafka client header propagation manual config"() {
when:
String message = "Testing without headers"
producer.send(new ProducerRecord<>(SHARED_TOPIC, message))
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.headers().iterator().hasNext() == propagationEnabled
}
}
}

View File

@ -0,0 +1,22 @@
plugins {
id("otel.library-instrumentation")
}
dependencies {
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.apache.kafka:kafka-clients:2.6.0")
testImplementation(project(":instrumentation:kafka-clients:kafka-clients-0.11:testing"))
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.10.2")
testImplementation("org.testcontainers:kafka")
}
tasks {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
}
}

View File

@ -0,0 +1,164 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.KafkaHeadersSetter;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class KafkaTracing {
private static final Logger logger = LoggerFactory.getLogger(KafkaTracing.class);
private static final TextMapGetter<ConsumerRecord<?, ?>> GETTER = new KafkaConsumerRecordGetter();
private static final TextMapSetter<Headers> SETTER = new KafkaHeadersSetter();
private final OpenTelemetry openTelemetry;
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
KafkaTracing(
OpenTelemetry openTelemetry,
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter) {
this.openTelemetry = openTelemetry;
this.producerInstrumenter = producerInstrumenter;
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
}
/** Returns a new {@link KafkaTracing} configured with the given {@link OpenTelemetry}. */
public static KafkaTracing create(OpenTelemetry openTelemetry) {
return newBuilder(openTelemetry).build();
}
/** Returns a new {@link KafkaTracingBuilder} configured with the given {@link OpenTelemetry}. */
public static KafkaTracingBuilder newBuilder(OpenTelemetry openTelemetry) {
return new KafkaTracingBuilder(openTelemetry);
}
private TextMapPropagator propagator() {
return openTelemetry.getPropagators().getTextMapPropagator();
}
/** Returns a decorated {@link Producer} that emits spans for each sent message. */
public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
return new TracingProducer<>(producer, this);
}
/** Returns a decorated {@link Consumer} that consumes spans for each received message. */
public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
return new TracingConsumer<>(consumer, this);
}
/**
* Build and inject span into record.
*
* @param record the producer record to inject span info.
*/
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record) {
Context currentContext = Context.current();
if (!producerInstrumenter.shouldStart(currentContext, record)) {
return;
}
Context current = producerInstrumenter.start(currentContext, record);
try (Scope ignored = current.makeCurrent()) {
try {
propagator().inject(current, record.headers(), SETTER);
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.error("failed to inject span context. sending record second time?", t);
}
}
producerInstrumenter.end(current, record, null, null);
}
/**
* Build and inject span into record.
*
* @param record the producer record to inject span info.
* @param callback the producer send callback
* @return send function's result
*/
<K, V> Future<RecordMetadata> buildAndInjectSpan(
ProducerRecord<K, V> record,
Callback callback,
BiFunction<ProducerRecord<K, V>, Callback, Future<RecordMetadata>> sendFn) {
Context parentContext = Context.current();
if (!producerInstrumenter.shouldStart(parentContext, record)) {
return sendFn.apply(record, callback);
}
Context context = producerInstrumenter.start(parentContext, record);
try (Scope ignored = context.makeCurrent()) {
propagator().inject(context, record.headers(), SETTER);
callback = new ProducerCallback(callback, parentContext, context, record);
return sendFn.apply(record, callback);
}
}
<K, V> void buildAndFinishSpan(ConsumerRecords<K, V> records) {
Context currentContext = Context.current();
for (ConsumerRecord<K, V> record : records) {
Context linkedContext = propagator().extract(currentContext, record, GETTER);
Context newContext = currentContext.with(Span.fromContext(linkedContext));
if (!consumerProcessInstrumenter.shouldStart(newContext, record)) {
continue;
}
Context current = consumerProcessInstrumenter.start(newContext, record);
consumerProcessInstrumenter.end(current, record, null, null);
}
}
private class ProducerCallback implements Callback {
private final Callback callback;
private final Context parentContext;
private final Context context;
private final ProducerRecord<?, ?> request;
public ProducerCallback(
Callback callback, Context parentContext, Context context, ProducerRecord<?, ?> request) {
this.callback = callback;
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
producerInstrumenter.end(context, request, null, exception);
if (callback != null) {
try (Scope ignored = parentContext.makeCurrent()) {
callback.onCompletion(metadata, exception);
}
}
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
public final class KafkaTracingBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerAttributesExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>> consumerAttributesExtractors =
new ArrayList<>();
KafkaTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = Objects.requireNonNull(openTelemetry);
}
public KafkaTracingBuilder addProducerAttributesExtractors(
AttributesExtractor<ProducerRecord<?, ?>, Void> extractor) {
producerAttributesExtractors.add(extractor);
return this;
}
public KafkaTracingBuilder addConsumerAttributesExtractors(
AttributesExtractor<ConsumerRecord<?, ?>, Void> extractor) {
consumerAttributesExtractors.add(extractor);
return this;
}
public KafkaTracing build() {
return new KafkaTracing(
openTelemetry,
KafkaInstrumenterBuilder.buildProducerInstrumenter(
INSTRUMENTATION_NAME, openTelemetry, producerAttributesExtractors),
KafkaInstrumenterBuilder.buildConsumerOperationInstrumenter(
INSTRUMENTATION_NAME,
openTelemetry,
MessageOperation.RECEIVE,
consumerAttributesExtractors));
}
}

View File

@ -0,0 +1,280 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
class TracingConsumer<K, V> implements Consumer<K, V> {
private final Consumer<K, V> consumer;
private final KafkaTracing tracing;
TracingConsumer(Consumer<K, V> consumer, KafkaTracing tracing) {
this.consumer = consumer;
this.tracing = tracing;
}
@Override
public Set<TopicPartition> assignment() {
return consumer.assignment();
}
@Override
public Set<String> subscription() {
return consumer.subscription();
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
consumer.subscribe(topics, listener);
}
@Override
public void subscribe(Collection<String> topics) {
consumer.subscribe(topics);
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
consumer.subscribe(pattern, listener);
}
@Override
public void subscribe(Pattern pattern) {
consumer.subscribe(pattern);
}
@Override
public void unsubscribe() {
consumer.unsubscribe();
}
@Override
public void assign(Collection<TopicPartition> partitions) {
consumer.assign(partitions);
}
@Override
@Deprecated
public ConsumerRecords<K, V> poll(long timeout) {
return poll(Duration.ofMillis(timeout));
}
@Override
public ConsumerRecords<K, V> poll(Duration duration) {
ConsumerRecords<K, V> records = consumer.poll(duration);
tracing.buildAndFinishSpan(records);
return records;
}
@Override
public void commitSync() {
consumer.commitSync();
}
@Override
public void commitSync(Duration duration) {
consumer.commitSync(duration);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
consumer.commitSync(offsets);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
consumer.commitSync(map, duration);
}
@Override
public void commitAsync() {
consumer.commitAsync();
}
@Override
public void commitAsync(OffsetCommitCallback callback) {
consumer.commitAsync(callback);
}
@Override
public void commitAsync(
Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
consumer.commitAsync(offsets, callback);
}
@Override
public void seek(TopicPartition partition, long offset) {
consumer.seek(partition, offset);
}
@Override
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
consumer.seek(partition, offsetAndMetadata);
}
@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
}
@Override
public long position(TopicPartition partition) {
return consumer.position(partition);
}
@Override
public long position(TopicPartition topicPartition, Duration duration) {
return consumer.position(topicPartition, duration);
}
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return consumer.committed(partition);
}
@Override
public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
return consumer.committed(topicPartition, duration);
}
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
return consumer.committed(partitions);
}
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(
Set<TopicPartition> partitions, final Duration timeout) {
return consumer.committed(partitions, timeout);
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return consumer.metrics();
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return consumer.partitionsFor(topic);
}
@Override
public List<PartitionInfo> partitionsFor(String s, Duration duration) {
return consumer.partitionsFor(s, duration);
}
@Override
public Map<String, List<PartitionInfo>> listTopics() {
return consumer.listTopics();
}
@Override
public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
return consumer.listTopics(duration);
}
@Override
public void pause(Collection<TopicPartition> partitions) {
consumer.pause(partitions);
}
@Override
public void resume(Collection<TopicPartition> partitions) {
consumer.resume(partitions);
}
@Override
public Set<TopicPartition> paused() {
return consumer.paused();
}
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch) {
return consumer.offsetsForTimes(timestampsToSearch);
}
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> map, Duration duration) {
return consumer.offsetsForTimes(map, duration);
}
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
return consumer.beginningOffsets(partitions);
}
@Override
public Map<TopicPartition, Long> beginningOffsets(
Collection<TopicPartition> collection, Duration duration) {
return consumer.beginningOffsets(collection, duration);
}
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
return consumer.endOffsets(partitions);
}
@Override
public Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> collection, Duration duration) {
return consumer.endOffsets(collection, duration);
}
@Override
public ConsumerGroupMetadata groupMetadata() {
return consumer.groupMetadata();
}
@Override
public void enforceRebalance() {
consumer.enforceRebalance();
}
@Override
public void close() {
consumer.close();
}
@Override
@Deprecated
public void close(long l, TimeUnit timeUnit) {
consumer.close(l, timeUnit);
}
@Override
public void close(Duration duration) {
consumer.close(duration);
}
@Override
public void wakeup() {
consumer.wakeup();
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import io.opentelemetry.api.GlobalOpenTelemetry;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
/**
* A ConsumerInterceptor that adds tracing capability. Add this interceptor's class name or class
* via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to get it
* instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc.
*/
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
private static final KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
tracing.buildAndFinishSpan(records);
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
class TracingProducer<K, V> implements Producer<K, V> {
private final Producer<K, V> producer;
private final KafkaTracing tracing;
TracingProducer(Producer<K, V> producer, KafkaTracing tracing) {
this.producer = producer;
this.tracing = tracing;
}
@Override
public void initTransactions() {
producer.initTransactions();
}
@Override
public void beginTransaction() {
producer.beginTransaction();
}
@Override
public void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
}
@Override
public void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) {
producer.sendOffsetsToTransaction(offsets, groupMetadata);
}
@Override
public void commitTransaction() {
producer.commitTransaction();
}
@Override
public void abortTransaction() {
producer.abortTransaction();
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return tracing.buildAndInjectSpan(record, callback, producer::send);
}
@Override
public void flush() {
producer.flush();
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return producer.partitionsFor(topic);
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return producer.metrics();
}
@Override
public void close() {
producer.close();
}
@Override
public void close(Duration duration) {
producer.close(duration);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import io.opentelemetry.api.GlobalOpenTelemetry;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* A ProducerInterceptor that adds tracing capability. Add this interceptor's class name or class
* via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to get it
* instantiated and used. See more details on ProducerInterceptor usage in its Javadoc.
*/
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
private static final KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
tracing.buildAndInjectSpan(producerRecord);
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
@Override
Map<String, ?> producerProps() {
def props = super.producerProps()
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.getName())
return props
}
@Override
Map<String, ?> consumerProps() {
def props = super.consumerProps()
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.getName())
return props
}
@Unroll
def "test interceptors"() throws Exception {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.value() == greeting
assert record.key() == null
}
assertTraces(2) {
traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER))
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
span(2) {
name SHARED_TOPIC + " receive"
kind CONSUMER
childOf span(1)
hasLink(span(1))
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
trace(1, 1) {
span(0) {
name "producer callback"
kind INTERNAL
hasNoParent()
}
}
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
@Unroll
def "test wrappers"() throws Exception {
KafkaTracing tracing = KafkaTracing.create(getOpenTelemetry())
when:
String greeting = "Hello Kafka!"
def wrappedProducer = tracing.wrap(producer)
runWithSpan("parent") {
wrappedProducer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}
then:
awaitUntilConsumerIsReady()
// check that the message was received
def wrappedConsumer = tracing.wrap(consumer)
def records = wrappedConsumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.value() == greeting
assert record.key() == null
}
assertTraces(1) {
traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER))
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
span(2) {
name SHARED_TOPIC + " receive"
kind CONSUMER
childOf span(1)
hasLink(span(1))
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
span(3) {
name "producer callback"
kind INTERNAL
childOf span(0)
}
}
}
}
}

View File

@ -1,7 +0,0 @@
plugins {
id("otel.javaagent-instrumentation")
}
dependencies {
compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.producer.ProducerRecord;
public final class KafkaHeadersSetter implements TextMapSetter<ProducerRecord<?, ?>> {
@Override
public void set(ProducerRecord<?, ?> carrier, String key, String value) {
carrier.headers().remove(key).add(key, value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,9 @@
plugins {
id("otel.library-instrumentation")
}
dependencies {
compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import static io.opentelemetry.api.common.AttributeKey.longKey;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.nio.charset.StandardCharsets;
@ -13,8 +13,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaHeadersGetter implements TextMapGetter<ConsumerRecord<?, ?>> {
public final class KafkaConsumerRecordGetter implements TextMapGetter<ConsumerRecord<?, ?>> {
@Override
public Iterable<String> keys(ConsumerRecord<?, ?> carrier) {
return StreamSupport.stream(carrier.headers().spliterator(), false)

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaHeadersGetter implements TextMapGetter<Headers> {
@Override
public Iterable<String> keys(Headers carrier) {
return StreamSupport.stream(carrier.spliterator(), false)
.map(Header::key)
.collect(Collectors.toList());
}
@Nullable
@Override
public String get(@Nullable Headers carrier, String key) {
Header header = carrier.lastHeader(key);
if (header == null) {
return null;
}
byte[] value = header.value();
if (value == null) {
return null;
}
return new String(value, StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.header.Headers;
public final class KafkaHeadersSetter implements TextMapSetter<Headers> {
@Override
public void set(Headers headers, String key, String value) {
headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
public final class KafkaInstrumenterBuilder {
public static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter(
String instrumentationName) {
return buildProducerInstrumenter(
instrumentationName, GlobalOpenTelemetry.get(), Collections.emptyList());
}
public static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
Iterable<AttributesExtractor<ProducerRecord<?, ?>, Void>> extractors) {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
SpanNameExtractor<ProducerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
return Instrumenter.<ProducerRecord<?, ?>, Void>newBuilder(
openTelemetry, instrumentationName, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractors(extractors)
.addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor())
.newInstrumenter(SpanKindExtractor.alwaysProducer());
}
public static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter(
String instrumentationName) {
return buildConsumerReceiveInstrumenter(
instrumentationName, GlobalOpenTelemetry.get(), Collections.emptyList());
}
public static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
Iterable<AttributesExtractor<ReceivedRecords, Void>> extractors) {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
return Instrumenter.<ReceivedRecords, Void>newBuilder(
openTelemetry, instrumentationName, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractors(extractors)
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter(
String instrumentationName) {
return buildConsumerOperationInstrumenter(
instrumentationName,
GlobalOpenTelemetry.get(),
MessageOperation.PROCESS,
Collections.emptyList());
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerOperationInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
MessageOperation operation,
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors) {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(operation);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
openTelemetry, instrumentationName, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor())
.addAttributesExtractors(extractors);
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaConsumerRecordGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaConsumerRecordGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
}
private KafkaInstrumenterBuilder() {}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
@ -56,7 +56,9 @@ public final class KafkaPropagation {
}
private static <K, V> void inject(Context context, ProducerRecord<K, V> record) {
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, record, SETTER);
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(context, record.headers(), SETTER);
}
private KafkaPropagation() {}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
package io.opentelemetry.instrumentation.kafka;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
package io.opentelemetry.instrumentation.kafka;
import com.google.auto.value.AutoValue;
import java.time.Instant;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
package io.opentelemetry.instrumentation.kafka;
import java.time.Instant;

View File

@ -12,7 +12,7 @@ muzzle {
}
dependencies {
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.apache.kafka:kafka-streams:0.11.0.0")

View File

@ -5,20 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public final class KafkaStreamsSingletons {
@ -28,30 +16,7 @@ public final class KafkaStreamsSingletons {
private static final Instrumenter<ConsumerRecord<?, ?>, Void> INSTRUMENTER = buildInstrumenter();
private static Instrumenter<ConsumerRecord<?, ?>, Void> buildInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor());
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {

View File

@ -12,9 +12,9 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIterableWrapper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

View File

@ -11,9 +11,9 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import java.util.Iterator;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;

View File

@ -15,7 +15,7 @@ dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.springframework.kafka:spring-kafka:2.7.0")

View File

@ -9,8 +9,8 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerRecordGetter;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -22,7 +22,7 @@ public class KafkaBatchProcessSpanLinksExtractor
public KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) {
this.singleRecordLinkExtractor =
SpanLinksExtractor.fromUpstreamRequest(contextPropagators, new KafkaHeadersGetter());
SpanLinksExtractor.fromUpstreamRequest(contextPropagators, new KafkaConsumerRecordGetter());
}
@Override

View File

@ -218,7 +218,9 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent")
include(":instrumentation:jsf:myfaces-1.2:javaagent")
include(":instrumentation:jsp-2.3:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-common:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-0.11:testing")
include(":instrumentation:kafka-clients:kafka-clients-2.6:library")
include(":instrumentation:kafka-clients:kafka-clients-common:library")
include(":instrumentation:kafka-streams-0.11:javaagent")
include(":instrumentation:kotlinx-coroutines:javaagent")
include(":instrumentation:kubernetes-client-7.0:javaagent")