diff --git a/instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts similarity index 95% rename from instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index 08c6ca4bce..c5bfd9b75c 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -12,6 +12,8 @@ muzzle { } dependencies { + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) + library("org.apache.kafka:kafka-clients:0.11.0.0") testLibrary("org.springframework.kafka:spring-kafka:1.3.3.RELEASE") diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsConfig.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsConfig.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsConfig.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsConfig.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java similarity index 94% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java index 717e50562f..31a5021a4d 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java @@ -6,11 +6,11 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import static io.opentelemetry.api.trace.SpanKind.CONSUMER; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.TextMapExtractAdapter.GETTER; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -18,6 +18,7 @@ import org.apache.kafka.common.record.TimestampType; public class KafkaConsumerTracer extends BaseTracer { private static final KafkaConsumerTracer TRACER = new KafkaConsumerTracer(); + private static final KafkaHeadersGetter GETTER = new KafkaHeadersGetter(); public static KafkaConsumerTracer tracer() { return TRACER; @@ -45,7 +46,7 @@ public class KafkaConsumerTracer extends BaseTracer { private Context extractParent(ConsumerRecord record) { if (KafkaClientsConfig.isPropagationEnabled()) { - return extract(record.headers(), GETTER); + return extract(record, GETTER); } else { return Context.current(); } diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapInjectAdapter.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapInjectAdapter.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapInjectAdapter.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapInjectAdapter.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientBaseTest.groovy diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy similarity index 100% rename from instrumentation/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/build.gradle.kts similarity index 92% rename from instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/build.gradle.kts rename to instrumentation/kafka-clients/kafka-clients-2.4.0-testing/build.gradle.kts index c99a4f6093..6be486bccb 100644 --- a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/build.gradle.kts +++ b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/build.gradle.kts @@ -5,7 +5,7 @@ plugins { dependencies { library("org.apache.kafka:kafka-clients:2.4.0") - testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent")) + testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")) testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE") testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE") diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy similarity index 100% rename from instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy rename to instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientBaseTest.groovy diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy similarity index 100% rename from instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy rename to instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy diff --git a/instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy similarity index 100% rename from instrumentation/kafka-clients-0.11/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy rename to instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-common/javaagent/build.gradle.kts new file mode 100644 index 0000000000..c27944c219 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/build.gradle.kts @@ -0,0 +1,7 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +dependencies { + compileOnly("org.apache.kafka:kafka-clients:0.11.0.0") +} diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java new file mode 100644 index 0000000000..d1ec09f07b --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class KafkaConsumerAdditionalAttributesExtractor + extends AttributesExtractor, Void> { + @Override + protected void onStart(AttributesBuilder attributes, ConsumerRecord consumerRecord) { + set( + attributes, + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + (long) consumerRecord.partition()); + if (consumerRecord.value() == null) { + set(attributes, SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true); + } + } + + @Override + protected void onEnd( + AttributesBuilder attributes, ConsumerRecord consumerRecord, @Nullable Void unused) {} +} diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java new file mode 100644 index 0000000000..d8243dd392 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class KafkaConsumerAttributesExtractor + extends MessagingAttributesExtractor, Void> { + @Override + protected String system(ConsumerRecord consumerRecord) { + return "kafka"; + } + + @Override + protected String destinationKind(ConsumerRecord consumerRecord) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Override + protected String destination(ConsumerRecord consumerRecord) { + return consumerRecord.topic(); + } + + @Override + protected boolean temporaryDestination(ConsumerRecord consumerRecord) { + return false; + } + + @Override + protected @Nullable String protocol(ConsumerRecord consumerRecord) { + return null; + } + + @Override + protected @Nullable String protocolVersion(ConsumerRecord consumerRecord) { + return null; + } + + @Override + protected @Nullable String url(ConsumerRecord consumerRecord) { + return null; + } + + @Override + protected @Nullable String conversationId(ConsumerRecord consumerRecord) { + return null; + } + + @Override + protected Long messagePayloadSize(ConsumerRecord consumerRecord) { + return (long) consumerRecord.serializedValueSize(); + } + + @Override + protected @Nullable Long messagePayloadCompressedSize(ConsumerRecord consumerRecord) { + return null; + } + + @Override + protected MessageOperation operation(ConsumerRecord consumerRecord) { + return MessageOperation.PROCESS; + } + + @Override + protected @Nullable String messageId(ConsumerRecord consumerRecord, @Nullable Void unused) { + return null; + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java new file mode 100644 index 0000000000..b6ba7d08c2 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import static io.opentelemetry.api.common.AttributeKey.longKey; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class KafkaConsumerExperimentalAttributesExtractor + extends AttributesExtractor, Void> { + + private static final AttributeKey KAFKA_OFFSET = longKey("kafka.offset"); + private static final AttributeKey KAFKA_RECORD_QUEUE_TIME_MS = + longKey("kafka.record.queue_time_ms"); + + private static final boolean ENABLED = + Config.get() + .getBooleanProperty("otel.instrumentation.kafka.experimental-span-attributes", false); + + public static boolean isEnabled() { + return ENABLED; + } + + @Override + protected void onStart(AttributesBuilder attributes, ConsumerRecord consumerRecord) { + set(attributes, KAFKA_OFFSET, consumerRecord.offset()); + + // don't record a duration if the message was sent from an old Kafka client + if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + long produceTime = consumerRecord.timestamp(); + // this attribute shows how much time elapsed between the producer and the consumer of this + // message, which can be helpful for identifying queue bottlenecks + set( + attributes, + KAFKA_RECORD_QUEUE_TIME_MS, + Math.max(0L, System.currentTimeMillis() - produceTime)); + } + } + + @Override + protected void onEnd( + AttributesBuilder attributes, ConsumerRecord consumerRecord, @Nullable Void unused) {} +} diff --git a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapExtractAdapter.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java similarity index 52% rename from instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapExtractAdapter.java rename to instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java index 324550787e..fd57abfaf1 100644 --- a/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TextMapExtractAdapter.java +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java @@ -3,29 +3,28 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.javaagent.instrumentation.kafka; import io.opentelemetry.context.propagation.TextMapGetter; import java.nio.charset.StandardCharsets; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; - -public class TextMapExtractAdapter implements TextMapGetter { - - public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); +import org.checkerframework.checker.nullness.qual.Nullable; +public final class KafkaHeadersGetter implements TextMapGetter> { @Override - public Iterable keys(Headers headers) { - return StreamSupport.stream(headers.spliterator(), false) + public Iterable keys(ConsumerRecord carrier) { + return StreamSupport.stream(carrier.headers().spliterator(), false) .map(Header::key) .collect(Collectors.toList()); } + @Nullable @Override - public String get(Headers headers, String key) { - Header header = headers.lastHeader(key); + public String get(@Nullable ConsumerRecord carrier, String key) { + Header header = carrier.headers().lastHeader(key); if (header == null) { return null; } diff --git a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts index 4af5f61dfb..c6177dbf1e 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts @@ -19,7 +19,7 @@ dependencies { compileOnly("org.apache.kafka:kafka-streams:0.11.0.0") // Include kafka-clients instrumentation for tests. - testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent")) + testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")) testImplementation("org.apache.kafka:kafka-streams:0.11.0.0") testImplementation("org.apache.kafka:kafka-clients:0.11.0.0") diff --git a/settings.gradle.kts b/settings.gradle.kts index 61042d10ec..a2f64a501e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -207,8 +207,9 @@ include(":instrumentation:jsf:jsf-testing-common") include(":instrumentation:jsf:mojarra-1.2:javaagent") include(":instrumentation:jsf:myfaces-1.2:javaagent") include(":instrumentation:jsp-2.3:javaagent") -include(":instrumentation:kafka-clients-0.11:javaagent") -include(":instrumentation:kafka-clients-0.11:kafka-clients-2.4.0-testing") +include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent") +include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing") +include(":instrumentation:kafka-clients:kafka-clients-common:javaagent") include(":instrumentation:kafka-streams-0.11:javaagent") include(":instrumentation:kotlinx-coroutines:javaagent") include(":instrumentation:kubernetes-client-7.0:javaagent")