Convert kafka-streams to Instrumenter API (#4140)

This commit is contained in:
Mateusz Rzeszutek 2021-09-17 09:41:13 +02:00 committed by GitHub
parent 59439cb464
commit f14eeb0f15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 109 additions and 141 deletions

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.header.Headers;
public final class KafkaHeadersSetter implements TextMapSetter<Headers> {
@Override
public void set(Headers headers, String key, String value) {
headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -16,6 +16,7 @@ import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;

View File

@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalA
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;

View File

@ -14,6 +14,7 @@ import org.apache.kafka.common.header.Header;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaHeadersGetter implements TextMapGetter<ConsumerRecord<?, ?>> { public final class KafkaHeadersGetter implements TextMapGetter<ConsumerRecord<?, ?>> {
@Override @Override
public Iterable<String> keys(ConsumerRecord<?, ?> carrier) { public Iterable<String> keys(ConsumerRecord<?, ?> carrier) {
return StreamSupport.stream(carrier.headers().spliterator(), false) return StreamSupport.stream(carrier.headers().spliterator(), false)

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.producer.ProducerRecord;
public final class KafkaHeadersSetter implements TextMapSetter<ProducerRecord<?, ?>> {
@Override
public void set(ProducerRecord<?, ?> carrier, String key, String value) {
carrier.headers().remove(key).add(key, value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
package io.opentelemetry.javaagent.instrumentation.kafkaclients; package io.opentelemetry.javaagent.instrumentation.kafka;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
@ -56,9 +56,7 @@ public final class KafkaPropagation {
} }
private static <K, V> void inject(Context context, ProducerRecord<K, V> record) { private static <K, V> void inject(Context context, ProducerRecord<K, V> record) {
GlobalOpenTelemetry.getPropagators() GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, record, SETTER);
.getTextMapPropagator()
.inject(context, record.headers(), SETTER);
} }
private KafkaPropagation() {} private KafkaPropagation() {}

View File

@ -16,6 +16,8 @@ testSets {
} }
dependencies { dependencies {
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
compileOnly("org.apache.kafka:kafka-streams:0.11.0.0") compileOnly("org.apache.kafka:kafka-streams:0.11.0.0")
// Include kafka-clients instrumentation for tests. // Include kafka-clients instrumentation for tests.

View File

@ -172,8 +172,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" 0 "kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
"asdf" "testing" "asdf" "testing"
} }
} }

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public final class KafkaStreamsSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-streams-0.11";
private static final Instrumenter<ConsumerRecord<?, ?>, Void> INSTRUMENTER = buildInstrumenter();
private static Instrumenter<ConsumerRecord<?, ?>, Void> buildInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor());
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
// TODO: use the local receive span as parent, keep the producer in a link
return KafkaPropagation.isPropagationEnabled()
? builder.newConsumerInstrumenter(new KafkaHeadersGetter())
: builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
return INSTRUMENTER;
}
private KafkaStreamsSingletons() {}
}

View File

@ -1,61 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.TextMapExtractAdapter.GETTER;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.kafka.streams.processor.internals.StampedRecord;
public class KafkaStreamsTracer extends BaseTracer {
private static final KafkaStreamsTracer TRACER = new KafkaStreamsTracer();
private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
Config.get().getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false);
public static KafkaStreamsTracer tracer() {
return TRACER;
}
public Context startSpan(StampedRecord record) {
Context parentContext = extract(record.value.headers(), GETTER);
Span span =
spanBuilder(parentContext, spanNameForConsume(record), CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.startSpan();
onConsume(span, record);
return withConsumerSpan(parentContext, span);
}
public String spanNameForConsume(StampedRecord record) {
if (record == null) {
return null;
}
return record.topic() + " process";
}
public void onConsume(Span span, StampedRecord record) {
if (record != null) {
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, record.partition());
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
span.setAttribute("kafka.offset", record.offset());
}
}
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.kafka-streams-0.11";
}
}

View File

@ -7,10 +7,12 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ContextScopeHolder { public final class StateHolder {
public static final ThreadLocal<ContextScopeHolder> HOLDER = new ThreadLocal<>(); public static final ThreadLocal<StateHolder> HOLDER = new ThreadLocal<>();
private ConsumerRecord<?, ?> record;
private Context context; private Context context;
private Scope scope; private Scope scope;
@ -18,11 +20,16 @@ public class ContextScopeHolder {
scope.close(); scope.close();
} }
public ConsumerRecord<?, ?> getRecord() {
return record;
}
public Context getContext() { public Context getContext() {
return context; return context;
} }
public void set(Context context, Scope scope) { public void set(ConsumerRecord<?, ?> record, Context context, Scope scope) {
this.record = record;
this.context = context; this.context = context;
this.scope = scope; this.scope = scope;
} }

View File

@ -5,8 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams; package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.ContextScopeHolder.HOLDER; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsTracer.tracer; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
@ -15,6 +15,7 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -46,15 +47,18 @@ public class StreamTaskStartInstrumentation implements TypeInstrumentation {
return; return;
} }
ContextScopeHolder holder = HOLDER.get(); StateHolder holder = HOLDER.get();
if (holder == null) { if (holder == null) {
// somehow nextRecord() was called outside of process() // somehow nextRecord() was called outside of process()
return; return;
} }
Context context = tracer().startSpan(record); Context parentContext = Java8BytecodeBridge.currentContext();
if (!instrumenter().shouldStart(parentContext, record.value)) {
holder.set(context, context.makeCurrent()); return;
}
Context context = instrumenter().start(parentContext, record.value);
holder.set(record.value, context, context.makeCurrent());
} }
} }
} }

View File

@ -5,8 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams; package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.ContextScopeHolder.HOLDER; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsTracer.tracer; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
@ -36,25 +36,21 @@ public class StreamTaskStopInstrumentation implements TypeInstrumentation {
public static class StopSpanAdvice { public static class StopSpanAdvice {
@Advice.OnMethodEnter @Advice.OnMethodEnter
public static ContextScopeHolder onEnter() { public static StateHolder onEnter() {
ContextScopeHolder holder = new ContextScopeHolder(); StateHolder holder = new StateHolder();
HOLDER.set(holder); HOLDER.set(holder);
return holder; return holder;
} }
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan( public static void stopSpan(
@Advice.Enter ContextScopeHolder holder, @Advice.Thrown Throwable throwable) { @Advice.Enter StateHolder holder, @Advice.Thrown Throwable throwable) {
HOLDER.remove(); HOLDER.remove();
Context context = holder.getContext(); Context context = holder.getContext();
if (context != null) { if (context != null) {
holder.closeScope(); holder.closeScope();
instrumenter().end(context, holder.getRecord(), null, throwable);
if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
tracer().end(context);
}
} }
} }
} }

View File

@ -1,38 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
public class TextMapExtractAdapter implements TextMapGetter<Headers> {
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
@Override
public Iterable<String> keys(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.map(Header::key)
.collect(Collectors.toList());
}
@Override
public String get(Headers headers, String key) {
Header header = headers.lastHeader(key);
if (header == null) {
return null;
}
byte[] value = header.value();
if (value == null) {
return null;
}
return new String(value, StandardCharsets.UTF_8);
}
}

View File

@ -172,8 +172,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" 0 "kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
"asdf" "testing" "asdf" "testing"
} }
} }