Spring-kafka single record instrumentation (#5904)

This commit is contained in:
Mateusz Rzeszutek 2022-04-21 23:42:49 +02:00 committed by GitHub
parent ceeaa1b660
commit 28825724db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 519 additions and 241 deletions

View File

@ -14,15 +14,13 @@ public final class KafkaClientsConsumerProcessTracing {
private KafkaClientsConsumerProcessTracing() {}
public static void enableWrapping() {
wrappingEnabled.set(true);
}
public static void disableWrapping() {
wrappingEnabled.set(false);
public static boolean setEnabled(boolean enabled) {
boolean previous = wrappingEnabled.get();
wrappingEnabled.set(enabled);
return previous;
}
public static boolean wrappingEnabled() {
return wrappingEnabled.get() == true;
return wrappingEnabled.get();
}
}

View File

@ -1,19 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.bootstrap.kafka;
// Classes used by multiple instrumentations should be in a bootstrap module to ensure that all
// instrumentations see the same class. Helper classes are injected into each class loader that
// contains an instrumentation that uses them, so instrumentations in different class loaders will
// have separate copies of helper classes.
public interface KafkaClientsConsumerProcessWrapper<T> {
/**
* Returns the actual, non-tracing object wrapped by this wrapper. This method is only supposed to
* be used by other Kafka consumer instrumentations that want to suppress the kafka-clients one.
*/
T unwrap();
}

View File

@ -17,12 +17,14 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.internal.ReceivedRecords;
import io.opentelemetry.instrumentation.kafka.internal.Timer;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class KafkaConsumerInstrumentation implements TypeInstrumentation {
@ -74,6 +76,18 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation {
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);
// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
consumerRecordContext.set(record, context);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
}

View File

@ -7,14 +7,11 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class TracingIterable<K, V>
implements Iterable<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterable<ConsumerRecord<K, V>>> {
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
private final Iterable<ConsumerRecord<K, V>> delegate;
@Nullable private final Context receiveContext;
private boolean firstIterator = true;
@ -48,9 +45,4 @@ public class TracingIterable<K, V>
return it;
}
@Override
public Iterable<ConsumerRecord<K, V>> unwrap() {
return delegate;
}
}

View File

@ -10,14 +10,11 @@ import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingl
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class TracingIterator<K, V>
implements Iterator<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterator<ConsumerRecord<K, V>>> {
public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
private final Iterator<ConsumerRecord<K, V>> delegateIterator;
private final Context parentContext;
@ -79,9 +76,4 @@ public class TracingIterator<K, V>
public void remove() {
delegateIterator.remove();
}
@Override
public Iterator<ConsumerRecord<K, V>> unwrap() {
return delegateIterator;
}
}

View File

@ -9,6 +9,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
@ -81,7 +82,18 @@ public final class KafkaInstrumenterFactory {
instrumentationName,
GlobalOpenTelemetry.get(),
MessageOperation.PROCESS,
Collections.emptyList());
Collections.emptyList(),
ErrorCauseExtractor.jdk());
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerProcessInstrumenter(
String instrumentationName, ErrorCauseExtractor errorCauseExtractor) {
return createConsumerOperationInstrumenter(
instrumentationName,
GlobalOpenTelemetry.get(),
MessageOperation.PROCESS,
Collections.emptyList(),
errorCauseExtractor);
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerOperationInstrumenter(
@ -89,6 +101,16 @@ public final class KafkaInstrumenterFactory {
OpenTelemetry openTelemetry,
MessageOperation operation,
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors) {
return createConsumerOperationInstrumenter(
instrumentationName, openTelemetry, operation, extractors, ErrorCauseExtractor.jdk());
}
private static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerOperationInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
MessageOperation operation,
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors,
ErrorCauseExtractor errorCauseExtractor) {
KafkaConsumerAttributesGetter getter = KafkaConsumerAttributesGetter.INSTANCE;
@ -99,7 +121,8 @@ public final class KafkaInstrumenterFactory {
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor())
.addAttributesExtractors(extractors);
.addAttributesExtractors(extractors)
.setErrorCauseExtractor(errorCauseExtractor);
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

View File

@ -5,23 +5,15 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
// This instrumentation copies the receive CONSUMER span context from the ConsumerRecords aggregate
// object to each individual record
public class StreamThreadInstrumentation implements TypeInstrumentation {
@Override
@ -31,47 +23,20 @@ public class StreamThreadInstrumentation implements TypeInstrumentation {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("pollRequests")
.and(isPrivate())
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
this.getClass().getName() + "$PollRecordsAdvice");
transformer.applyAdviceToMethod(named("runLoop"), this.getClass().getName() + "$RunLoopAdvice");
}
@SuppressWarnings("unused")
public static class PollRecordsAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return ConsumerRecords<?, ?> records) {
if (records.isEmpty()) {
return;
}
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
if (receiveContext == null) {
return;
}
VirtualField<ConsumerRecord<?, ?>, Context> singleRecordReceiveContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
singleRecordReceiveContext.set(record, receiveContext);
}
}
}
// this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation
@SuppressWarnings("unused")
public static class RunLoopAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
KafkaClientsConsumerProcessTracing.disableWrapping();
public static boolean onEnter() {
return KafkaClientsConsumerProcessTracing.setEnabled(false);
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit() {
KafkaClientsConsumerProcessTracing.enableWrapping();
public static void onExit(@Advice.Enter boolean previousValue) {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}

View File

@ -17,10 +17,13 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;
public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.kafka.listener.AbstractMessageListenerContainer");
@ -36,20 +39,48 @@ public class AbstractMessageListenerContainerInstrumentation implements TypeInst
.and(takesArguments(0))
.and(returns(named("org.springframework.kafka.listener.BatchInterceptor"))),
this.getClass().getName() + "$GetBatchInterceptorAdvice");
// getRecordInterceptor() is called internally by AbstractMessageListenerContainer
// implementations
transformer.applyAdviceToMethod(
named("getRecordInterceptor")
.and(isProtected())
.and(takesArguments(0))
.and(returns(named("org.springframework.kafka.listener.RecordInterceptor"))),
this.getClass().getName() + "$GetRecordInterceptorAdvice");
}
@SuppressWarnings("unused")
public static class GetBatchInterceptorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void onExit(
@Advice.Return(readOnly = false) BatchInterceptor<K, V> interceptor) {
if (!(interceptor instanceof InstrumentedBatchInterceptor)) {
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField =
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecords.class, Context.class);
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore =
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField =
VirtualField.find(ConsumerRecords.class, State.class);
interceptor =
new InstrumentedBatchInterceptor<>(receiveContextVirtualField, stateStore, interceptor);
new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor);
}
}
}
@SuppressWarnings("unused")
public static class GetRecordInterceptorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void onExit(
@Advice.Return(readOnly = false) RecordInterceptor<K, V> interceptor) {
if (!(interceptor instanceof InstrumentedRecordInterceptor)) {
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecord.class, Context.class);
VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField =
VirtualField.find(ConsumerRecord.class, State.class);
interceptor =
new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor);
}
}
}

View File

@ -5,7 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.spring.kafka;
import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
@ -16,16 +16,17 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.BatchInterceptor;
public final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField;
private final VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore;
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
private final VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField;
@Nullable private final BatchInterceptor<K, V> decorated;
public InstrumentedBatchInterceptor(
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField,
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore,
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField,
@Nullable BatchInterceptor<K, V> decorated) {
this.receiveContextVirtualField = receiveContextVirtualField;
this.stateStore = stateStore;
this.receiveContextField = receiveContextField;
this.stateField = stateField;
this.decorated = decorated;
}
@ -33,17 +34,17 @@ public final class InstrumentedBatchInterceptor<K, V> implements BatchIntercepto
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
Context parentContext = getParentContext(records);
if (processInstrumenter().shouldStart(parentContext, records)) {
Context context = processInstrumenter().start(parentContext, records);
if (batchProcessInstrumenter().shouldStart(parentContext, records)) {
Context context = batchProcessInstrumenter().start(parentContext, records);
Scope scope = context.makeCurrent();
stateStore.set(records, State.create(records, context, scope));
stateField.set(records, State.create(records, context, scope));
}
return decorated == null ? records : decorated.intercept(records, consumer);
}
private Context getParentContext(ConsumerRecords<K, V> records) {
Context receiveContext = receiveContextVirtualField.get(records);
Context receiveContext = receiveContextField.get(records);
// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
@ -66,11 +67,11 @@ public final class InstrumentedBatchInterceptor<K, V> implements BatchIntercepto
}
private void end(ConsumerRecords<K, V> records, @Nullable Throwable error) {
State<K, V> state = stateStore.get(records);
stateStore.set(records, null);
State<ConsumerRecords<K, V>> state = stateField.get(records);
stateField.set(records, null);
if (state != null) {
state.scope().close();
processInstrumenter().end(state.context(), state.request(), null, error);
batchProcessInstrumenter().end(state.context(), state.request(), null, error);
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.kafka;
import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;
public final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
private final VirtualField<ConsumerRecord<K, V>, Context> receiveContextField;
private final VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField;
@Nullable private final RecordInterceptor<K, V> decorated;
public InstrumentedRecordInterceptor(
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField,
VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField,
@Nullable RecordInterceptor<K, V> decorated) {
this.receiveContextField = receiveContextField;
this.stateField = stateField;
this.decorated = decorated;
}
@SuppressWarnings("deprecation") // implementing deprecated method for better compatibility
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
start(record);
return decorated == null ? record : decorated.intercept(record);
}
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
start(record);
return decorated == null ? record : decorated.intercept(record, consumer);
}
private void start(ConsumerRecord<K, V> record) {
Context parentContext = getParentContext(record);
if (processInstrumenter().shouldStart(parentContext, record)) {
Context context = processInstrumenter().start(parentContext, record);
Scope scope = context.makeCurrent();
stateField.set(record, State.create(record, context, scope));
}
}
private Context getParentContext(ConsumerRecord<K, V> records) {
Context receiveContext = receiveContextField.get(records);
// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
}
@Override
public void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
end(record, null);
if (decorated != null) {
decorated.success(record, consumer);
}
}
@Override
public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
end(record, exception);
if (decorated != null) {
decorated.failure(record, exception, consumer);
}
}
private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
State<ConsumerRecord<K, V>> state = stateField.get(record);
stateField.set(record, null);
if (state != null) {
state.scope().close();
processInstrumenter().end(state.context(), state.request(), null, error);
}
}
}

View File

@ -10,39 +10,25 @@ import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class KafkaBatchProcessSpanLinksExtractor
final class KafkaBatchProcessSpanLinksExtractor
implements SpanLinksExtractor<ConsumerRecords<?, ?>> {
private final SpanLinksExtractor<ConsumerRecord<?, ?>> singleRecordLinkExtractor;
public KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) {
KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) {
this.singleRecordLinkExtractor =
SpanLinksExtractor.fromUpstreamRequest(
contextPropagators, KafkaConsumerRecordGetter.INSTANCE);
}
@Override
@SuppressWarnings("unchecked")
public void extract(
SpanLinksBuilder spanLinks, Context parentContext, ConsumerRecords<?, ?> records) {
Iterator<? extends ConsumerRecord<?, ?>> it = records.iterator();
// this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's
// no current CONSUMER span
if (it instanceof KafkaClientsConsumerProcessWrapper) {
it =
((KafkaClientsConsumerProcessWrapper<Iterator<? extends ConsumerRecord<?, ?>>>) it)
.unwrap();
}
while (it.hasNext()) {
ConsumerRecord<?, ?> record = it.next();
for (ConsumerRecord<?, ?> record : records) {
// explicitly passing root to avoid situation where context propagation is turned off and the
// parent (CONSUMER receive) span is linked
singleRecordLinkExtractor.extract(spanLinks, Context.root(), record);

View File

@ -8,14 +8,14 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
public final class KafkaBatchErrorCauseExtractor implements ErrorCauseExtractor {
private final ErrorCauseExtractor delegate = ErrorCauseExtractor.jdk();
enum SpringKafkaErrorCauseExtractor implements ErrorCauseExtractor {
INSTANCE;
@Override
public Throwable extractCause(Throwable error) {
if (error instanceof ListenerExecutionFailedException && error.getCause() != null) {
error = error.getCause();
}
return delegate.extractCause(error);
return ErrorCauseExtractor.jdk().extractCause(error);
}
}

View File

@ -11,6 +11,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -18,10 +19,13 @@ public final class SpringKafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";
private static final Instrumenter<ConsumerRecords<?, ?>, Void> PROCESS_INSTRUMENTER =
buildProcessInstrumenter();
private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER =
buildBatchProcessInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER =
KafkaInstrumenterFactory.createConsumerProcessInstrumenter(
INSTRUMENTATION_NAME, SpringKafkaErrorCauseExtractor.INSTANCE);
private static Instrumenter<ConsumerRecords<?, ?>, Void> buildProcessInstrumenter() {
private static Instrumenter<ConsumerRecords<?, ?>, Void> buildBatchProcessInstrumenter() {
KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;
@ -32,15 +36,15 @@ public final class SpringKafkaSingletons {
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(GlobalOpenTelemetry.getPropagators()))
.setErrorCauseExtractor(new KafkaBatchErrorCauseExtractor())
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> receiveInstrumenter() {
return null;
public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {
return BATCH_PROCESS_INSTRUMENTER;
}
public static Instrumenter<ConsumerRecords<?, ?>, Void> processInstrumenter() {
public static Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter() {
return PROCESS_INSTRUMENTER;
}

View File

@ -8,17 +8,15 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka;
import com.google.auto.value.AutoValue;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@AutoValue
public abstract class State<K, V> {
public abstract class State<REQUEST> {
public static <K, V> State<K, V> create(
ConsumerRecords<K, V> request, Context context, Scope scope) {
public static <REQUEST> State<REQUEST> create(REQUEST request, Context context, Scope scope) {
return new AutoValue_State<>(request, context, scope);
}
public abstract ConsumerRecords<K, V> request();
public abstract REQUEST request();
public abstract Context context();

View File

@ -31,13 +31,13 @@ public class SuppressingKafkaClientsInstrumentation implements TypeInstrumentati
@SuppressWarnings("unused")
public static class RunLoopAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
KafkaClientsConsumerProcessTracing.disableWrapping();
public static boolean onEnter() {
return KafkaClientsConsumerProcessTracing.setEnabled(false);
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit() {
KafkaClientsConsumerProcessTracing.enableWrapping();
public static void onExit(@Advice.Enter boolean previousValue) {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class BatchRecordListener {
static AtomicInteger lastBatchSize = new AtomicInteger()
static CountDownLatch messageReceived = new CountDownLatch(2)
@KafkaListener(id = "testBatchListener", topics = "testBatchTopic", containerFactory = "batchFactory")
void listener(List<ConsumerRecord<String, String>> records) {
lastBatchSize.set(records.size())
records.size().times {
messageReceived.countDown()
}
GlobalTraceUtil.runWithSpan("consumer") {}
records.forEach({ record ->
if (record.value() == "error") {
throw new IllegalArgumentException("boom")
}
})
}
static void reset() {
messageReceived = new CountDownLatch(2)
lastBatchSize.set(0)
}
static void waitForMessages() {
messageReceived.await(30, TimeUnit.SECONDS)
}
static int getLastBatchSize() {
return lastBatchSize.get()
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.ConsumerFactory
@SpringBootConfiguration
@EnableAutoConfiguration
class ConsumerConfig {
@Bean
NewTopic batchTopic() {
return TopicBuilder.name("testBatchTopic")
.partitions(1)
.replicas(1)
.build()
}
@Bean
NewTopic singleTopic() {
return TopicBuilder.name("testSingleTopic")
.partitions(1)
.replicas(1)
.build()
}
@Bean
BatchRecordListener batchRecordListener() {
return new BatchRecordListener()
}
@Bean
SingleRecordListener singleRecordListener() {
return new SingleRecordListener()
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>()
// do not retry failed records
factory.setBatchErrorHandler(new DoNothingBatchErrorHandler())
factory.setConsumerFactory(consumerFactory)
factory.setBatchListener(true)
factory.setAutoStartup(true)
factory
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>()
// do not retry failed records
factory.setErrorHandler(new DoNothingErrorHandler())
factory.setConsumerFactory(consumerFactory)
factory.setBatchListener(false)
factory.setAutoStartup(true)
factory
}
}

View File

@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.listener.ErrorHandler
class DoNothingErrorHandler implements ErrorHandler {
@Override
void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
class SingleRecordListener {
@KafkaListener(id = "testSingleListener", topics = "testSingleTopic", containerFactory = "singleFactory")
void listener(ConsumerRecord<String, String> record) {
GlobalTraceUtil.runWithSpan("consumer") {}
if (record.value() == "error") {
throw new IllegalArgumentException("boom")
}
}
}

View File

@ -4,29 +4,17 @@
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.SpringApplication
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.wait.strategy.Wait
import spock.lang.Shared
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.api.trace.StatusCode.ERROR
@ -73,17 +61,17 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
// a batch.
def maxAttempts = 5
for (i in 1..maxAttempts) {
Listener.reset()
BatchRecordListener.reset()
runWithSpan("producer") {
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testTopic", "10", "testSpan1")
ops.send("testTopic", "20", "testSpan2")
ops.send("testBatchTopic", "10", "testSpan1")
ops.send("testBatchTopic", "20", "testSpan2")
})
}
Listener.waitForMessages()
if (Listener.getLastBatchSize() == 2) {
BatchRecordListener.waitForMessages()
if (BatchRecordListener.getLastBatchSize() == 2) {
break
} else if (i < maxAttempts) {
ignoreTracesAndClear(2)
@ -93,7 +81,7 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
then:
assertTraces(2) {
traces.sort(orderByRootSpanName("producer", "testTopic receive", "testTopic process"))
traces.sort(orderByRootSpanName("producer", "testBatchTopic receive", "testBatchTopic process"))
SpanData producer1, producer2
@ -102,22 +90,22 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
name "producer"
}
span(1) {
name "testTopic send"
name "testBatchTopic send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
}
}
span(2) {
name "testTopic send"
name "testBatchTopic send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
}
}
@ -127,25 +115,25 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
}
trace(1, 3) {
span(0) {
name "testTopic receive"
name "testBatchTopic receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name "testTopic process"
name "testBatchTopic process"
kind CONSUMER
childOf span(0)
hasLink producer1
hasLink producer2
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
@ -158,6 +146,144 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
}
}
def "should handle failure in Kafka batch listener"() {
given:
def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate)
when:
runWithSpan("producer") {
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testBatchTopic", "10", "error")
})
}
then:
assertTraces(2) {
traces.sort(orderByRootSpanName("producer", "testBatchTopic receive"))
SpanData producer
trace(0, 2) {
span(0) {
name "producer"
}
span(1) {
name "testBatchTopic send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
}
}
producer = span(1)
}
trace(1, 3) {
span(0) {
name "testBatchTopic receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name "testBatchTopic process"
kind CONSUMER
childOf span(0)
hasLink producer
status ERROR
errorEvent IllegalArgumentException, "boom"
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "consumer"
childOf span(1)
}
}
}
}
def "should create spans for single record receive+process"() {
given:
def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate)
when:
runWithSpan("producer") {
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testSingleTopic", "10", "testSpan")
})
}
then:
assertTraces(2) {
traces.sort(orderByRootSpanName("producer", "testSingleTopic receive"))
SpanData producer
trace(0, 2) {
span(0) {
name "producer"
}
span(1) {
name "testSingleTopic send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
}
}
producer = span(1)
}
trace(1, 3) {
span(0) {
name "testSingleTopic receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name "testSingleTopic process"
kind CONSUMER
childOf span(0)
hasLink producer
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
span(2) {
name "consumer"
childOf span(1)
}
}
}
}
def "should handle failure in Kafka listener"() {
given:
def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate)
@ -165,13 +291,13 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
when:
runWithSpan("producer") {
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testTopic", "10", "error")
ops.send("testSingleTopic", "10", "error")
})
}
then:
assertTraces(2) {
traces.sort(orderByRootSpanName("producer", "testTopic receive"))
traces.sort(orderByRootSpanName("producer", "testSingleTopic receive"))
SpanData producer
@ -180,12 +306,12 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
name "producer"
}
span(1) {
name "testTopic send"
name "testSingleTopic send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
}
}
@ -194,18 +320,18 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
}
trace(1, 3) {
span(0) {
name "testTopic receive"
name "testSingleTopic receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name "testTopic process"
name "testSingleTopic process"
kind CONSUMER
childOf span(0)
hasLink producer
@ -213,9 +339,13 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
errorEvent IllegalArgumentException, "boom"
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" "testTopic"
"$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
span(2) {
@ -225,71 +355,4 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
}
}
}
@SpringBootConfiguration
@EnableAutoConfiguration
static class ConsumerConfig {
@Bean
NewTopic topic() {
return TopicBuilder.name("testTopic")
.partitions(1)
.replicas(1)
.build()
}
@Bean
Listener listener() {
return new Listener()
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>()
// do not retry failed records
factory.setBatchErrorHandler(new DoNothingBatchErrorHandler())
factory.setConsumerFactory(consumerFactory)
factory.setBatchListener(true)
factory.setAutoStartup(true)
// setting interceptBeforeTx to true eliminates kafka-clients noise - otherwise spans would be created on every ConsumerRecords#iterator() call
factory.setContainerCustomizer({ container ->
container.setInterceptBeforeTx(true)
})
factory
}
}
static class Listener {
static AtomicInteger lastBatchSize = new AtomicInteger()
static CountDownLatch messageReceived = new CountDownLatch(2)
@KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory")
void listener(List<ConsumerRecord<String, String>> records) {
lastBatchSize.set(records.size())
records.size().times {
messageReceived.countDown()
}
GlobalTraceUtil.runWithSpan("consumer") {}
records.forEach({ record ->
if (record.value() == "error") {
throw new IllegalArgumentException("boom")
}
})
}
static void reset() {
messageReceived = new CountDownLatch(2)
lastBatchSize.set(0)
}
static void waitForMessages() {
messageReceived.await(30, TimeUnit.SECONDS)
}
static int getLastBatchSize() {
return lastBatchSize.get()
}
}
}