Support kafka streams 3 (#4236)

* Support kafka streams 3

* make thread local wider so it would work on all kafka-streams versions

* Move classes used by multiple instrumentations into bootstrap module to ensure that everybody uses the same copy of them

* spotless

* allow project as muzzle extra dependency

* add comment

* fix merge
This commit is contained in:
Lauri Tulmin 2021-10-05 19:43:00 +03:00 committed by GitHub
parent 9bbd490288
commit 0cfc71c3c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 152 additions and 121 deletions

View File

@ -199,7 +199,7 @@ fun addMuzzleTask(muzzleDirective: MuzzleDirective, versionArtifact: Artifact?,
config.dependencies.add(dep) config.dependencies.add(dep)
for (additionalDependency in muzzleDirective.additionalDependencies.get()) { for (additionalDependency in muzzleDirective.additionalDependencies.get()) {
val additional = if (additionalDependency.count { it == ':' } < 2) { val additional = if (additionalDependency is String && additionalDependency.count { it == ':' } < 2) {
// Dependency definition without version, use the artifact's version. // Dependency definition without version, use the artifact's version.
"${additionalDependency}:${versionArtifact.version}" "${additionalDependency}:${versionArtifact.version}"
} else { } else {

View File

@ -18,7 +18,7 @@ abstract class MuzzleDirective {
abstract val classifier: Property<String> abstract val classifier: Property<String>
abstract val versions: Property<String> abstract val versions: Property<String>
abstract val skipVersions: SetProperty<String> abstract val skipVersions: SetProperty<String>
abstract val additionalDependencies: ListProperty<String> abstract val additionalDependencies: ListProperty<Any>
abstract val excludedDependencies: ListProperty<String> abstract val excludedDependencies: ListProperty<String>
abstract val assertPass: Property<Boolean> abstract val assertPass: Property<Boolean>
abstract val assertInverse: Property<Boolean> abstract val assertInverse: Property<Boolean>
@ -42,11 +42,11 @@ abstract class MuzzleDirective {
/** /**
* Adds extra dependencies to the current muzzle test. * Adds extra dependencies to the current muzzle test.
* *
* @param compileString An extra dependency in the gradle canonical form: * @param dependencyNotation An extra dependency in the gradle canonical form:
* '<group_id>:<artifact_id>:<version_id>'. * '<group_id>:<artifact_id>:<version_id>' or a project dependency project(':some-project').
*/ */
fun extraDependency(compileString: String?) { fun extraDependency(dependencyNotation: Any) {
additionalDependencies.add(compileString!!) additionalDependencies.add(dependencyNotation)
} }
/** /**

View File

@ -0,0 +1,3 @@
plugins {
id("otel.javaagent-bootstrap")
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.bootstrap.kafka;
// Classes used by multiple instrumentations should be in a bootstrap module to ensure that all
// instrumentations see the same class. Helper classes are injected into each class loader that
// contains an instrumentation that uses them, so instrumentations in different class loaders will
// have separate copies of helper classes.
public final class KafkaClientsConsumerProcessTracing {
private static final ThreadLocal<Boolean> wrappingEnabled = ThreadLocal.withInitial(() -> true);
private KafkaClientsConsumerProcessTracing() {}
public static void enableWrapping() {
wrappingEnabled.set(true);
}
public static void disableWrapping() {
wrappingEnabled.set(false);
}
public static boolean wrappingEnabled() {
return wrappingEnabled.get() == true;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.bootstrap.kafka;
// Classes used by multiple instrumentations should be in a bootstrap module to ensure that all
// instrumentations see the same class. Helper classes are injected into each class loader that
// contains an instrumentation that uses them, so instrumentations in different class loaders will
// have separate copies of helper classes.
public interface KafkaClientsConsumerProcessWrapper<T> {
/**
* Returns the actual, non-tracing object wrapped by this wrapper. This method is only supposed to
* be used by other Kafka consumer instrumentations that want to suppress the kafka-clients one.
*/
T unwrap();
}

View File

@ -7,6 +7,7 @@ muzzle {
group.set("org.apache.kafka") group.set("org.apache.kafka")
module.set("kafka-clients") module.set("kafka-clients")
versions.set("[0.11.0.0,)") versions.set("[0.11.0.0,)")
extraDependency(project(":instrumentation:kafka-clients:kafka-clients-0.11:bootstrap"))
assertInverse.set(true) assertInverse.set(true)
} }
} }
@ -15,6 +16,7 @@ dependencies {
compileOnly("com.google.auto.value:auto-value-annotations") compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value") annotationProcessor("com.google.auto.value:auto-value")
compileOnly(project(":instrumentation:kafka-clients:kafka-clients-0.11:bootstrap"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.apache.kafka:kafka-clients:0.11.0.0") library("org.apache.kafka:kafka-clients:0.11.0.0")

View File

@ -60,13 +60,13 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation {
public static class IterableAdvice { public static class IterableAdvice {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap( public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records, @Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<?, ?>> iterable) { @Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
if (iterable != null) { if (iterable != null) {
SpanContext receiveSpanContext = SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
iterable = new TracingIterable(iterable, receiveSpanContext); iterable = TracingIterable.wrap(iterable, receiveSpanContext);
} }
} }
} }
@ -75,13 +75,13 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation {
public static class ListAdvice { public static class ListAdvice {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap( public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records, @Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) List<ConsumerRecord<?, ?>> list) { @Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
if (list != null) { if (list != null) {
SpanContext receiveSpanContext = SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
list = new TracingList(list, receiveSpanContext); list = TracingList.wrap(list, receiveSpanContext);
} }
} }
} }
@ -90,13 +90,13 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation {
public static class IteratorAdvice { public static class IteratorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap( public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records, @Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<?, ?>> iterator) { @Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
if (iterator != null) { if (iterator != null) {
SpanContext receiveSpanContext = SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
iterator = new TracingIterator(iterator, receiveSpanContext); iterator = TracingIterator.wrap(iterator, receiveSpanContext);
} }
} }
} }

View File

@ -6,23 +6,33 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients; package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerIterableWrapper; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator; import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
public class TracingIterable<K, V> public class TracingIterable<K, V>
implements Iterable<ConsumerRecord<K, V>>, KafkaConsumerIterableWrapper<K, V> { implements Iterable<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterable<ConsumerRecord<K, V>>> {
private final Iterable<ConsumerRecord<K, V>> delegate; private final Iterable<ConsumerRecord<K, V>> delegate;
@Nullable private final SpanContext receiveSpanContext; @Nullable private final SpanContext receiveSpanContext;
private boolean firstIterator = true; private boolean firstIterator = true;
public TracingIterable( protected TracingIterable(
Iterable<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) { Iterable<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
this.delegate = delegate; this.delegate = delegate;
this.receiveSpanContext = receiveSpanContext; this.receiveSpanContext = receiveSpanContext;
} }
public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
Iterable<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingIterable<>(delegate, receiveSpanContext);
}
return delegate;
}
@Override @Override
public Iterator<ConsumerRecord<K, V>> iterator() { public Iterator<ConsumerRecord<K, V>> iterator() {
Iterator<ConsumerRecord<K, V>> it; Iterator<ConsumerRecord<K, V>> it;
@ -30,7 +40,7 @@ public class TracingIterable<K, V>
// However, this is not thread-safe, but usually the first (hopefully only) traversal of // However, this is not thread-safe, but usually the first (hopefully only) traversal of
// ConsumerRecords is performed in the same thread that called poll() // ConsumerRecords is performed in the same thread that called poll()
if (firstIterator) { if (firstIterator) {
it = new TracingIterator<>(delegate.iterator(), receiveSpanContext); it = TracingIterator.wrap(delegate.iterator(), receiveSpanContext);
firstIterator = false; firstIterator = false;
} else { } else {
it = delegate.iterator(); it = delegate.iterator();

View File

@ -11,14 +11,15 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerIteratorWrapper; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator; import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
public class TracingIterator<K, V> public class TracingIterator<K, V>
implements Iterator<ConsumerRecord<K, V>>, KafkaConsumerIteratorWrapper<K, V> { implements Iterator<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterator<ConsumerRecord<K, V>>> {
private final Iterator<ConsumerRecord<K, V>> delegateIterator; private final Iterator<ConsumerRecord<K, V>> delegateIterator;
private final Context parentContext; private final Context parentContext;
@ -30,7 +31,7 @@ public class TracingIterator<K, V>
@Nullable private Context currentContext; @Nullable private Context currentContext;
@Nullable private Scope currentScope; @Nullable private Scope currentScope;
public TracingIterator( private TracingIterator(
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable SpanContext receiveSpanContext) { Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable SpanContext receiveSpanContext) {
this.delegateIterator = delegateIterator; this.delegateIterator = delegateIterator;
@ -42,6 +43,14 @@ public class TracingIterator<K, V>
this.parentContext = parentContext; this.parentContext = parentContext;
} }
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable SpanContext receiveSpanContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingIterator<>(delegateIterator, receiveSpanContext);
}
return delegateIterator;
}
@Override @Override
public boolean hasNext() { public boolean hasNext() {
closeScopeAndEndSpan(); closeScopeAndEndSpan();

View File

@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients; package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
@ -15,12 +16,20 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public class TracingList<K, V> extends TracingIterable<K, V> implements List<ConsumerRecord<K, V>> { public class TracingList<K, V> extends TracingIterable<K, V> implements List<ConsumerRecord<K, V>> {
private final List<ConsumerRecord<K, V>> delegate; private final List<ConsumerRecord<K, V>> delegate;
public TracingList( private TracingList(
List<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) { List<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
super(delegate, receiveSpanContext); super(delegate, receiveSpanContext);
this.delegate = delegate; this.delegate = delegate;
} }
public static <K, V> List<ConsumerRecord<K, V>> wrap(
List<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingList<>(delegate, receiveSpanContext);
}
return delegate;
}
@Override @Override
public int size() { public int size() {
return delegate.size(); return delegate.size();

View File

@ -1,17 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface KafkaConsumerIterableWrapper<K, V> {
/**
* Returns the actual, non-tracing iterable. This method is only supposed to be used by other
* Kafka consumer instrumentations that want to suppress the kafka-clients one.
*/
Iterable<ConsumerRecord<K, V>> unwrap();
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface KafkaConsumerIteratorWrapper<K, V> {
/**
* Returns the actual, non-tracing iterator. This method is only supposed to be used by other
* Kafka consumer instrumentations that want to suppress the kafka-clients one.
*/
Iterator<ConsumerRecord<K, V>> unwrap();
}

View File

@ -7,11 +7,12 @@ muzzle {
pass { pass {
group.set("org.apache.kafka") group.set("org.apache.kafka")
module.set("kafka-streams") module.set("kafka-streams")
versions.set("[0.11.0.0,3)") versions.set("[0.11.0.0,)")
} }
} }
dependencies { dependencies {
compileOnly(project(":instrumentation:kafka-clients:kafka-clients-0.11:bootstrap"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.apache.kafka:kafka-streams:0.11.0.0") library("org.apache.kafka:kafka-streams:0.11.0.0")
@ -20,8 +21,6 @@ dependencies {
testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")) testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent"))
testImplementation("org.testcontainers:kafka") testImplementation("org.testcontainers:kafka")
latestDepTestLibrary("org.apache.kafka:kafka-streams:2.+")
} }
tasks { tasks {

View File

@ -5,7 +5,6 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams; package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
@ -22,15 +21,11 @@ import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
// at some point in time SourceNodeRecordDeserializer was refactored into RecordDeserializer // in 1.0.0 SourceNodeRecordDeserializer was refactored into RecordDeserializer
public class RecordDeserializerInstrumentation implements TypeInstrumentation { public class RecordDeserializerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("org.apache.kafka.streams.processor.internals.RecordDeserializer");
}
@Override @Override
public ElementMatcher<TypeDescription> typeMatcher() { public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.streams.processor.internals.RecordDeserializer") return named("org.apache.kafka.streams.processor.internals.RecordDeserializer")
@ -55,6 +50,16 @@ public class RecordDeserializerInstrumentation implements TypeInstrumentation {
public static void onExit( public static void onExit(
@Advice.Argument(1) ConsumerRecord<?, ?> incoming, @Advice.Argument(1) ConsumerRecord<?, ?> incoming,
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) { @Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {
if (result == null) {
return;
}
// on 1.x we need to copy headers from incoming to result
if (!result.headers().iterator().hasNext()) {
for (Header header : incoming.headers()) {
result.headers().add(header);
}
}
// copy the receive CONSUMER span association // copy the receive CONSUMER span association
VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan = VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan =

View File

@ -19,7 +19,7 @@ import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.header.Header;
// This is necessary because SourceNodeRecordDeserializer drops the headers. :-( // This is necessary because SourceNodeRecordDeserializer drops the headers. :-(
public class SourceNodeRecordDeserializerInstrumentation implements TypeInstrumentation { public class SourceNodeRecordDeserializerInstrumentation implements TypeInstrumentation {
@ -47,20 +47,14 @@ public class SourceNodeRecordDeserializerInstrumentation implements TypeInstrume
public static void saveHeaders( public static void saveHeaders(
@Advice.Argument(0) ConsumerRecord<?, ?> incoming, @Advice.Argument(0) ConsumerRecord<?, ?> incoming,
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) { @Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {
if (result == null) {
return;
}
result = // copy headers from incoming to result
new ConsumerRecord<>( for (Header header : incoming.headers()) {
result.topic(), result.headers().add(header);
result.partition(), }
result.offset(),
result.timestamp(),
TimestampType.CREATE_TIME,
result.checksum(),
result.serializedKeySize(),
result.serializedValueSize(),
result.key(),
result.value(),
incoming.headers());
// copy the receive CONSUMER span association // copy the receive CONSUMER span association
VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan = VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan =

View File

@ -9,16 +9,13 @@ import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStrea
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerIterableWrapper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class StreamTaskInstrumentation implements TypeInstrumentation { public class StreamTaskInstrumentation implements TypeInstrumentation {
@ -32,9 +29,6 @@ public class StreamTaskInstrumentation implements TypeInstrumentation {
transformer.applyAdviceToMethod( transformer.applyAdviceToMethod(
named("process").and(isPublic()), named("process").and(isPublic()),
StreamTaskInstrumentation.class.getName() + "$ProcessAdvice"); StreamTaskInstrumentation.class.getName() + "$ProcessAdvice");
transformer.applyAdviceToMethod(
named("addRecords").and(isPublic()).and(takesArgument(1, Iterable.class)),
StreamTaskInstrumentation.class.getName() + "$AddRecordsAdvice");
} }
// the method decorated by this advice calls PartitionGroup.nextRecord(), which triggers // the method decorated by this advice calls PartitionGroup.nextRecord(), which triggers
@ -61,21 +55,4 @@ public class StreamTaskInstrumentation implements TypeInstrumentation {
} }
} }
} }
// this advice removes the CONSUMER spans created by the kafka-clients instrumentation
@SuppressWarnings("unused")
public static class AddRecordsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 1, readOnly = false)
Iterable<? extends ConsumerRecord<?, ?>> records) {
// this will forcefully suppress the kafka-clients CONSUMER instrumentation even though
// there's no current CONSUMER span
if (records instanceof KafkaConsumerIterableWrapper) {
records = ((KafkaConsumerIterableWrapper<?, ?>) records).unwrap();
}
}
}
} }

View File

@ -11,10 +11,9 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerIteratorWrapper; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -37,6 +36,7 @@ public class StreamThreadInstrumentation implements TypeInstrumentation {
.and(isPrivate()) .and(isPrivate())
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
this.getClass().getName() + "$PollRecordsAdvice"); this.getClass().getName() + "$PollRecordsAdvice");
transformer.applyAdviceToMethod(named("runLoop"), this.getClass().getName() + "$RunLoopAdvice");
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -53,20 +53,26 @@ public class StreamThreadInstrumentation implements TypeInstrumentation {
return; return;
} }
VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan = VirtualField<ConsumerRecord<?, ?>, SpanContext> singleRecordReceiveSpan =
VirtualField.find(ConsumerRecord.class, SpanContext.class); VirtualField.find(ConsumerRecord.class, SpanContext.class);
Iterator<? extends ConsumerRecord<?, ?>> it = records.iterator(); for (ConsumerRecord<?, ?> record : records) {
// this will forcefully suppress the kafka-clients CONSUMER instrumentation even though
// there's no current CONSUMER span
if (it instanceof KafkaConsumerIteratorWrapper) {
it = ((KafkaConsumerIteratorWrapper<?, ?>) it).unwrap();
}
while (it.hasNext()) {
ConsumerRecord<?, ?> record = it.next();
singleRecordReceiveSpan.set(record, receiveSpanContext); singleRecordReceiveSpan.set(record, receiveSpanContext);
} }
} }
} }
// this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation
@SuppressWarnings("unused")
public static class RunLoopAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
KafkaClientsConsumerProcessTracing.disableWrapping();
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit() {
KafkaClientsConsumerProcessTracing.enableWrapping();
}
}
} }

View File

@ -15,6 +15,7 @@ dependencies {
compileOnly("com.google.auto.value:auto-value-annotations") compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value") annotationProcessor("com.google.auto.value:auto-value")
compileOnly(project(":instrumentation:kafka-clients:kafka-clients-0.11:bootstrap"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.springframework.kafka:spring-kafka:2.7.0") library("org.springframework.kafka:spring-kafka:2.7.0")

View File

@ -9,8 +9,8 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerIteratorWrapper;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator; import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -26,6 +26,7 @@ public class KafkaBatchProcessSpanLinksExtractor
} }
@Override @Override
@SuppressWarnings("unchecked")
public void extract( public void extract(
SpanLinksBuilder spanLinks, Context parentContext, ConsumerRecords<?, ?> records) { SpanLinksBuilder spanLinks, Context parentContext, ConsumerRecords<?, ?> records) {
@ -33,8 +34,10 @@ public class KafkaBatchProcessSpanLinksExtractor
// this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's
// no current CONSUMER span // no current CONSUMER span
if (it instanceof KafkaConsumerIteratorWrapper) { if (it instanceof KafkaClientsConsumerProcessWrapper) {
it = ((KafkaConsumerIteratorWrapper<?, ?>) it).unwrap(); it =
((KafkaClientsConsumerProcessWrapper<Iterator<? extends ConsumerRecord<?, ?>>>) it)
.unwrap();
} }
while (it.hasNext()) { while (it.hasNext()) {

View File

@ -217,6 +217,7 @@ include(":instrumentation:jsf:jsf-testing-common")
include(":instrumentation:jsf:mojarra-1.2:javaagent") include(":instrumentation:jsf:mojarra-1.2:javaagent")
include(":instrumentation:jsf:myfaces-1.2:javaagent") include(":instrumentation:jsf:myfaces-1.2:javaagent")
include(":instrumentation:jsp-2.3:javaagent") include(":instrumentation:jsp-2.3:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-0.11:bootstrap")
include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent") include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-0.11:testing") include(":instrumentation:kafka-clients:kafka-clients-0.11:testing")
include(":instrumentation:kafka-clients:kafka-clients-2.6:library") include(":instrumentation:kafka-clients:kafka-clients-2.6:library")