From f14eeb0f1584e6d6c98d7fafb84d479cfbbcb7c5 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Fri, 17 Sep 2021 09:41:13 +0200 Subject: [PATCH] Convert kafka-streams to Instrumenter API (#4140) --- .../kafkaclients/KafkaHeadersSetter.java | 18 ------ .../KafkaProducerInstrumentation.java | 1 + .../kafkaclients/KafkaSingletons.java | 1 + .../kafka/KafkaHeadersGetter.java | 1 + .../kafka/KafkaHeadersSetter.java | 18 ++++++ .../kafka}/KafkaPropagation.java | 6 +- .../javaagent/build.gradle.kts | 2 + .../groovy/KafkaStreamsTest.groovy | 2 + .../kafkastreams/KafkaStreamsSingletons.java | 53 ++++++++++++++++ .../kafkastreams/KafkaStreamsTracer.java | 61 ------------------- ...ntextScopeHolder.java => StateHolder.java} | 13 +++- .../StreamTaskStartInstrumentation.java | 16 +++-- .../StreamTaskStopInstrumentation.java | 18 +++--- .../kafkastreams/TextMapExtractAdapter.java | 38 ------------ .../src/test/groovy/KafkaStreamsTest.groovy | 2 + 15 files changed, 109 insertions(+), 141 deletions(-) delete mode 100644 instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java create mode 100644 instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersSetter.java rename instrumentation/kafka-clients/{kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients => kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka}/KafkaPropagation.java (92%) create mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java delete mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java rename instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/{ContextScopeHolder.java => StateHolder.java} (54%) delete mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/TextMapExtractAdapter.java diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java deleted file mode 100644 index c23f1c314e..0000000000 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients; - -import io.opentelemetry.context.propagation.TextMapSetter; -import java.nio.charset.StandardCharsets; -import org.apache.kafka.common.header.Headers; - -public final class KafkaHeadersSetter implements TextMapSetter { - - @Override - public void set(Headers headers, String key, String value) { - headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); - } -} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java index c599b5f450..2d6e724de3 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java @@ -16,6 +16,7 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 72d18222a3..f0d33e7bbd 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalA import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java index fd57abfaf1..b114a26718 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java @@ -14,6 +14,7 @@ import org.apache.kafka.common.header.Header; import org.checkerframework.checker.nullness.qual.Nullable; public final class KafkaHeadersGetter implements TextMapGetter> { + @Override public Iterable keys(ConsumerRecord carrier) { return StreamSupport.stream(carrier.headers().spliterator(), false) diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersSetter.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersSetter.java new file mode 100644 index 0000000000..1da267e8e7 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersSetter.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import io.opentelemetry.context.propagation.TextMapSetter; +import java.nio.charset.StandardCharsets; +import org.apache.kafka.clients.producer.ProducerRecord; + +public final class KafkaHeadersSetter implements TextMapSetter> { + + @Override + public void set(ProducerRecord carrier, String key, String value) { + carrier.headers().remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaPropagation.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaPropagation.java similarity index 92% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaPropagation.java rename to instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaPropagation.java index 6b16337451..1037ca4a18 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaPropagation.java +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaPropagation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.javaagent.instrumentation.kafka; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; @@ -56,9 +56,7 @@ public final class KafkaPropagation { } private static void inject(Context context, ProducerRecord record) { - GlobalOpenTelemetry.getPropagators() - .getTextMapPropagator() - .inject(context, record.headers(), SETTER); + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, record, SETTER); } private KafkaPropagation() {} diff --git a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts index 1c8139ba11..96169b96d9 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts @@ -16,6 +16,8 @@ testSets { } dependencies { + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) + compileOnly("org.apache.kafka:kafka-streams:0.11.0.0") // Include kafka-clients instrumentation for tests. diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy index b8fc7a46d6..9327d28179 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy @@ -172,8 +172,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } "asdf" "testing" } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java new file mode 100644 index 0000000000..fffd520ec4 --- /dev/null +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public final class KafkaStreamsSingletons { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-streams-0.11"; + + private static final Instrumenter, Void> INSTRUMENTER = buildInstrumenter(); + + private static Instrumenter, Void> buildInstrumenter() { + KafkaConsumerAttributesExtractor attributesExtractor = + new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS); + SpanNameExtractor> spanNameExtractor = + MessagingSpanNameExtractor.create(attributesExtractor); + + InstrumenterBuilder, Void> builder = + Instrumenter., Void>newBuilder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + .addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor()); + if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { + builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); + } + // TODO: use the local receive span as parent, keep the producer in a link + return KafkaPropagation.isPropagationEnabled() + ? builder.newConsumerInstrumenter(new KafkaHeadersGetter()) + : builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + public static Instrumenter, Void> instrumenter() { + return INSTRUMENTER; + } + + private KafkaStreamsSingletons() {} +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java deleted file mode 100644 index c889c436c6..0000000000 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkastreams; - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.TextMapExtractAdapter.GETTER; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.config.Config; -import io.opentelemetry.instrumentation.api.tracer.BaseTracer; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import org.apache.kafka.streams.processor.internals.StampedRecord; - -public class KafkaStreamsTracer extends BaseTracer { - private static final KafkaStreamsTracer TRACER = new KafkaStreamsTracer(); - - private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = - Config.get().getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false); - - public static KafkaStreamsTracer tracer() { - return TRACER; - } - - public Context startSpan(StampedRecord record) { - Context parentContext = extract(record.value.headers(), GETTER); - Span span = - spanBuilder(parentContext, spanNameForConsume(record), CONSUMER) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka") - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()) - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") - .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") - .startSpan(); - onConsume(span, record); - return withConsumerSpan(parentContext, span); - } - - public String spanNameForConsume(StampedRecord record) { - if (record == null) { - return null; - } - return record.topic() + " process"; - } - - public void onConsume(Span span, StampedRecord record) { - if (record != null) { - span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, record.partition()); - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("kafka.offset", record.offset()); - } - } - } - - @Override - protected String getInstrumentationName() { - return "io.opentelemetry.kafka-streams-0.11"; - } -} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/ContextScopeHolder.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java similarity index 54% rename from instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/ContextScopeHolder.java rename to instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java index f95fda44aa..b75c133b07 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/ContextScopeHolder.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java @@ -7,10 +7,12 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import org.apache.kafka.clients.consumer.ConsumerRecord; -public class ContextScopeHolder { - public static final ThreadLocal HOLDER = new ThreadLocal<>(); +public final class StateHolder { + public static final ThreadLocal HOLDER = new ThreadLocal<>(); + private ConsumerRecord record; private Context context; private Scope scope; @@ -18,11 +20,16 @@ public class ContextScopeHolder { scope.close(); } + public ConsumerRecord getRecord() { + return record; + } + public Context getContext() { return context; } - public void set(Context context, Scope scope) { + public void set(ConsumerRecord record, Context context, Scope scope) { + this.record = record; this.context = context; this.scope = scope; } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java index cf5327f97c..e0e3353800 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java @@ -5,8 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.ContextScopeHolder.HOLDER; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsTracer.tracer; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -15,6 +15,7 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -46,15 +47,18 @@ public class StreamTaskStartInstrumentation implements TypeInstrumentation { return; } - ContextScopeHolder holder = HOLDER.get(); + StateHolder holder = HOLDER.get(); if (holder == null) { // somehow nextRecord() was called outside of process() return; } - Context context = tracer().startSpan(record); - - holder.set(context, context.makeCurrent()); + Context parentContext = Java8BytecodeBridge.currentContext(); + if (!instrumenter().shouldStart(parentContext, record.value)) { + return; + } + Context context = instrumenter().start(parentContext, record.value); + holder.set(record.value, context, context.makeCurrent()); } } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java index 60084cf568..96a46398a4 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java @@ -5,8 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.ContextScopeHolder.HOLDER; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsTracer.tracer; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -36,25 +36,21 @@ public class StreamTaskStopInstrumentation implements TypeInstrumentation { public static class StopSpanAdvice { @Advice.OnMethodEnter - public static ContextScopeHolder onEnter() { - ContextScopeHolder holder = new ContextScopeHolder(); + public static StateHolder onEnter() { + StateHolder holder = new StateHolder(); HOLDER.set(holder); return holder; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter ContextScopeHolder holder, @Advice.Thrown Throwable throwable) { + @Advice.Enter StateHolder holder, @Advice.Thrown Throwable throwable) { HOLDER.remove(); + Context context = holder.getContext(); if (context != null) { holder.closeScope(); - - if (throwable != null) { - tracer().endExceptionally(context, throwable); - } else { - tracer().end(context); - } + instrumenter().end(context, holder.getRecord(), null, throwable); } } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/TextMapExtractAdapter.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/TextMapExtractAdapter.java deleted file mode 100644 index 9cc353618b..0000000000 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/TextMapExtractAdapter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkastreams; - -import io.opentelemetry.context.propagation.TextMapGetter; -import java.nio.charset.StandardCharsets; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; - -public class TextMapExtractAdapter implements TextMapGetter { - - public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); - - @Override - public Iterable keys(Headers headers) { - return StreamSupport.stream(headers.spliterator(), false) - .map(Header::key) - .collect(Collectors.toList()); - } - - @Override - public String get(Headers headers, String key) { - Header header = headers.lastHeader(key); - if (header == null) { - return null; - } - byte[] value = header.value(); - if (value == null) { - return null; - } - return new String(value, StandardCharsets.UTF_8); - } -} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy index e1d5cab2a2..cab08efa3d 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy @@ -172,8 +172,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } "asdf" "testing" } }