From 28825724db7d8708fad28467e5da900463591fff Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 21 Apr 2022 23:42:49 +0200 Subject: [PATCH] Spring-kafka single record instrumentation (#5904) --- .../KafkaClientsConsumerProcessTracing.java | 12 +- .../KafkaClientsConsumerProcessWrapper.java | 19 -- .../KafkaConsumerInstrumentation.java | 14 + .../kafkaclients/TracingIterable.java | 10 +- .../kafkaclients/TracingIterator.java | 10 +- .../internal/KafkaInstrumenterFactory.java | 27 +- .../StreamThreadInstrumentation.java | 43 +-- ...ssageListenerContainerInstrumentation.java | 37 ++- .../kafka/InstrumentedBatchInterceptor.java | 29 +- .../kafka/InstrumentedRecordInterceptor.java | 87 ++++++ .../KafkaBatchProcessSpanLinksExtractor.java | 20 +- ...va => SpringKafkaErrorCauseExtractor.java} | 6 +- .../spring/kafka/SpringKafkaSingletons.java | 18 +- .../instrumentation/spring/kafka/State.java | 8 +- ...uppressingKafkaClientsInstrumentation.java | 8 +- .../test/groovy/BatchRecordListener.groovy | 45 +++ .../src/test/groovy/ConsumerConfig.groovy | 67 +++++ .../test/groovy/DoNothingErrorHandler.groovy | 13 + .../test/groovy/SingleRecordListener.groovy | 18 ++ .../SpringKafkaInstrumentationTest.groovy | 269 +++++++++++------- 20 files changed, 519 insertions(+), 241 deletions(-) delete mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessWrapper.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java rename instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/{KafkaBatchErrorCauseExtractor.java => SpringKafkaErrorCauseExtractor.java} (71%) create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/BatchRecordListener.groovy create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/ConsumerConfig.groovy create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingErrorHandler.groovy create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SingleRecordListener.groovy diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java index bf89932636..65e4c36c76 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java @@ -14,15 +14,13 @@ public final class KafkaClientsConsumerProcessTracing { private KafkaClientsConsumerProcessTracing() {} - public static void enableWrapping() { - wrappingEnabled.set(true); - } - - public static void disableWrapping() { - wrappingEnabled.set(false); + public static boolean setEnabled(boolean enabled) { + boolean previous = wrappingEnabled.get(); + wrappingEnabled.set(enabled); + return previous; } public static boolean wrappingEnabled() { - return wrappingEnabled.get() == true; + return wrappingEnabled.get(); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessWrapper.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessWrapper.java deleted file mode 100644 index 73c806ceb9..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessWrapper.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.bootstrap.kafka; - -// Classes used by multiple instrumentations should be in a bootstrap module to ensure that all -// instrumentations see the same class. Helper classes are injected into each class loader that -// contains an instrumentation that uses them, so instrumentations in different class loaders will -// have separate copies of helper classes. -public interface KafkaClientsConsumerProcessWrapper { - - /** - * Returns the actual, non-tracing object wrapped by this wrapper. This method is only supposed to - * be used by other Kafka consumer instrumentations that want to suppress the kafka-clients one. - */ - T unwrap(); -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index 37d14b751e..12bab9899e 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -17,12 +17,14 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.instrumentation.kafka.internal.ReceivedRecords; import io.opentelemetry.instrumentation.kafka.internal.Timer; +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.time.Duration; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; public class KafkaConsumerInstrumentation implements TypeInstrumentation { @@ -74,6 +76,18 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation { VirtualField, Context> consumerRecordsContext = VirtualField.find(ConsumerRecords.class, Context.class); consumerRecordsContext.set(records, context); + + // disable process tracing and store the receive span for each individual record too + boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false); + try { + VirtualField, Context> consumerRecordContext = + VirtualField.find(ConsumerRecord.class, Context.class); + for (ConsumerRecord record : records) { + consumerRecordContext.set(record, context); + } + } finally { + KafkaClientsConsumerProcessTracing.setEnabled(previousValue); + } } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java index 4f0478f012..c2455cbf71 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java @@ -7,14 +7,11 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; -import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper; import java.util.Iterator; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingIterable - implements Iterable>, - KafkaClientsConsumerProcessWrapper>> { +public class TracingIterable implements Iterable> { private final Iterable> delegate; @Nullable private final Context receiveContext; private boolean firstIterator = true; @@ -48,9 +45,4 @@ public class TracingIterable return it; } - - @Override - public Iterable> unwrap() { - return delegate; - } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java index e00e15bf05..12d764c5bc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java @@ -10,14 +10,11 @@ import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingl import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; -import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper; import java.util.Iterator; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingIterator - implements Iterator>, - KafkaClientsConsumerProcessWrapper>> { +public class TracingIterator implements Iterator> { private final Iterator> delegateIterator; private final Context parentContext; @@ -79,9 +76,4 @@ public class TracingIterator public void remove() { delegateIterator.remove(); } - - @Override - public Iterator> unwrap() { - return delegateIterator; - } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java index 581352e449..1b2019828a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.config.ExperimentalConfig; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; @@ -81,7 +82,18 @@ public final class KafkaInstrumenterFactory { instrumentationName, GlobalOpenTelemetry.get(), MessageOperation.PROCESS, - Collections.emptyList()); + Collections.emptyList(), + ErrorCauseExtractor.jdk()); + } + + public static Instrumenter, Void> createConsumerProcessInstrumenter( + String instrumentationName, ErrorCauseExtractor errorCauseExtractor) { + return createConsumerOperationInstrumenter( + instrumentationName, + GlobalOpenTelemetry.get(), + MessageOperation.PROCESS, + Collections.emptyList(), + errorCauseExtractor); } public static Instrumenter, Void> createConsumerOperationInstrumenter( @@ -89,6 +101,16 @@ public final class KafkaInstrumenterFactory { OpenTelemetry openTelemetry, MessageOperation operation, Iterable, Void>> extractors) { + return createConsumerOperationInstrumenter( + instrumentationName, openTelemetry, operation, extractors, ErrorCauseExtractor.jdk()); + } + + private static Instrumenter, Void> createConsumerOperationInstrumenter( + String instrumentationName, + OpenTelemetry openTelemetry, + MessageOperation operation, + Iterable, Void>> extractors, + ErrorCauseExtractor errorCauseExtractor) { KafkaConsumerAttributesGetter getter = KafkaConsumerAttributesGetter.INSTANCE; @@ -99,7 +121,8 @@ public final class KafkaInstrumenterFactory { MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) .addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor()) - .addAttributesExtractors(extractors); + .addAttributesExtractors(extractors) + .setErrorCauseExtractor(errorCauseExtractor); if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java index 65a9f7e9d9..6c94c88241 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java @@ -5,23 +5,15 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; -import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -// This instrumentation copies the receive CONSUMER span context from the ConsumerRecords aggregate -// object to each individual record public class StreamThreadInstrumentation implements TypeInstrumentation { @Override @@ -31,47 +23,20 @@ public class StreamThreadInstrumentation implements TypeInstrumentation { @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - named("pollRequests") - .and(isPrivate()) - .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), - this.getClass().getName() + "$PollRecordsAdvice"); transformer.applyAdviceToMethod(named("runLoop"), this.getClass().getName() + "$RunLoopAdvice"); } - @SuppressWarnings("unused") - public static class PollRecordsAdvice { - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return ConsumerRecords records) { - if (records.isEmpty()) { - return; - } - - Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records); - if (receiveContext == null) { - return; - } - - VirtualField, Context> singleRecordReceiveContext = - VirtualField.find(ConsumerRecord.class, Context.class); - - for (ConsumerRecord record : records) { - singleRecordReceiveContext.set(record, receiveContext); - } - } - } - // this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation @SuppressWarnings("unused") public static class RunLoopAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter() { - KafkaClientsConsumerProcessTracing.disableWrapping(); + public static boolean onEnter() { + return KafkaClientsConsumerProcessTracing.setEnabled(false); } @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit() { - KafkaClientsConsumerProcessTracing.enableWrapping(); + public static void onExit(@Advice.Enter boolean previousValue) { + KafkaClientsConsumerProcessTracing.setEnabled(previousValue); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java index 39066d7fa0..2c2420ecb3 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java @@ -17,10 +17,13 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.listener.BatchInterceptor; +import org.springframework.kafka.listener.RecordInterceptor; public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation { + @Override public ElementMatcher typeMatcher() { return named("org.springframework.kafka.listener.AbstractMessageListenerContainer"); @@ -36,20 +39,48 @@ public class AbstractMessageListenerContainerInstrumentation implements TypeInst .and(takesArguments(0)) .and(returns(named("org.springframework.kafka.listener.BatchInterceptor"))), this.getClass().getName() + "$GetBatchInterceptorAdvice"); + // getRecordInterceptor() is called internally by AbstractMessageListenerContainer + // implementations + transformer.applyAdviceToMethod( + named("getRecordInterceptor") + .and(isProtected()) + .and(takesArguments(0)) + .and(returns(named("org.springframework.kafka.listener.RecordInterceptor"))), + this.getClass().getName() + "$GetRecordInterceptorAdvice"); } @SuppressWarnings("unused") public static class GetBatchInterceptorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( @Advice.Return(readOnly = false) BatchInterceptor interceptor) { + if (!(interceptor instanceof InstrumentedBatchInterceptor)) { - VirtualField, Context> receiveContextVirtualField = + VirtualField, Context> receiveContextField = VirtualField.find(ConsumerRecords.class, Context.class); - VirtualField, State> stateStore = + VirtualField, State>> stateField = VirtualField.find(ConsumerRecords.class, State.class); interceptor = - new InstrumentedBatchInterceptor<>(receiveContextVirtualField, stateStore, interceptor); + new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor); + } + } + } + + @SuppressWarnings("unused") + public static class GetRecordInterceptorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) RecordInterceptor interceptor) { + + if (!(interceptor instanceof InstrumentedRecordInterceptor)) { + VirtualField, Context> receiveContextField = + VirtualField.find(ConsumerRecord.class, Context.class); + VirtualField, State>> stateField = + VirtualField.find(ConsumerRecord.class, State.class); + interceptor = + new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java index dca6ba2d10..a7d2719d1e 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; -import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter; +import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -16,16 +16,17 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.listener.BatchInterceptor; public final class InstrumentedBatchInterceptor implements BatchInterceptor { - private final VirtualField, Context> receiveContextVirtualField; - private final VirtualField, State> stateStore; + + private final VirtualField, Context> receiveContextField; + private final VirtualField, State>> stateField; @Nullable private final BatchInterceptor decorated; public InstrumentedBatchInterceptor( - VirtualField, Context> receiveContextVirtualField, - VirtualField, State> stateStore, + VirtualField, Context> receiveContextField, + VirtualField, State>> stateField, @Nullable BatchInterceptor decorated) { - this.receiveContextVirtualField = receiveContextVirtualField; - this.stateStore = stateStore; + this.receiveContextField = receiveContextField; + this.stateField = stateField; this.decorated = decorated; } @@ -33,17 +34,17 @@ public final class InstrumentedBatchInterceptor implements BatchIntercepto public ConsumerRecords intercept(ConsumerRecords records, Consumer consumer) { Context parentContext = getParentContext(records); - if (processInstrumenter().shouldStart(parentContext, records)) { - Context context = processInstrumenter().start(parentContext, records); + if (batchProcessInstrumenter().shouldStart(parentContext, records)) { + Context context = batchProcessInstrumenter().start(parentContext, records); Scope scope = context.makeCurrent(); - stateStore.set(records, State.create(records, context, scope)); + stateField.set(records, State.create(records, context, scope)); } return decorated == null ? records : decorated.intercept(records, consumer); } private Context getParentContext(ConsumerRecords records) { - Context receiveContext = receiveContextVirtualField.get(records); + Context receiveContext = receiveContextField.get(records); // use the receive CONSUMER span as parent if it's available return receiveContext != null ? receiveContext : Context.current(); @@ -66,11 +67,11 @@ public final class InstrumentedBatchInterceptor implements BatchIntercepto } private void end(ConsumerRecords records, @Nullable Throwable error) { - State state = stateStore.get(records); - stateStore.set(records, null); + State> state = stateField.get(records); + stateField.set(records, null); if (state != null) { state.scope().close(); - processInstrumenter().end(state.context(), state.request(), null, error); + batchProcessInstrumenter().end(state.context(), state.request(), null, error); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java new file mode 100644 index 0000000000..13c31fe979 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java @@ -0,0 +1,87 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.field.VirtualField; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.listener.RecordInterceptor; + +public final class InstrumentedRecordInterceptor implements RecordInterceptor { + + private final VirtualField, Context> receiveContextField; + private final VirtualField, State>> stateField; + @Nullable private final RecordInterceptor decorated; + + public InstrumentedRecordInterceptor( + VirtualField, Context> receiveContextField, + VirtualField, State>> stateField, + @Nullable RecordInterceptor decorated) { + this.receiveContextField = receiveContextField; + this.stateField = stateField; + this.decorated = decorated; + } + + @SuppressWarnings("deprecation") // implementing deprecated method for better compatibility + @Override + public ConsumerRecord intercept(ConsumerRecord record) { + start(record); + return decorated == null ? record : decorated.intercept(record); + } + + @Override + public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) { + start(record); + return decorated == null ? record : decorated.intercept(record, consumer); + } + + private void start(ConsumerRecord record) { + Context parentContext = getParentContext(record); + + if (processInstrumenter().shouldStart(parentContext, record)) { + Context context = processInstrumenter().start(parentContext, record); + Scope scope = context.makeCurrent(); + stateField.set(record, State.create(record, context, scope)); + } + } + + private Context getParentContext(ConsumerRecord records) { + Context receiveContext = receiveContextField.get(records); + + // use the receive CONSUMER span as parent if it's available + return receiveContext != null ? receiveContext : Context.current(); + } + + @Override + public void success(ConsumerRecord record, Consumer consumer) { + end(record, null); + if (decorated != null) { + decorated.success(record, consumer); + } + } + + @Override + public void failure(ConsumerRecord record, Exception exception, Consumer consumer) { + end(record, exception); + if (decorated != null) { + decorated.failure(record, exception, consumer); + } + } + + private void end(ConsumerRecord record, @Nullable Throwable error) { + State> state = stateField.get(record); + stateField.set(record, null); + if (state != null) { + state.scope().close(); + processInstrumenter().end(state.context(), state.request(), null, error); + } + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java index 3f54ddecef..fb793e0930 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java @@ -10,39 +10,25 @@ import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; -import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper; -import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -public class KafkaBatchProcessSpanLinksExtractor +final class KafkaBatchProcessSpanLinksExtractor implements SpanLinksExtractor> { private final SpanLinksExtractor> singleRecordLinkExtractor; - public KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) { + KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) { this.singleRecordLinkExtractor = SpanLinksExtractor.fromUpstreamRequest( contextPropagators, KafkaConsumerRecordGetter.INSTANCE); } @Override - @SuppressWarnings("unchecked") public void extract( SpanLinksBuilder spanLinks, Context parentContext, ConsumerRecords records) { - Iterator> it = records.iterator(); - - // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's - // no current CONSUMER span - if (it instanceof KafkaClientsConsumerProcessWrapper) { - it = - ((KafkaClientsConsumerProcessWrapper>>) it) - .unwrap(); - } - - while (it.hasNext()) { - ConsumerRecord record = it.next(); + for (ConsumerRecord record : records) { // explicitly passing root to avoid situation where context propagation is turned off and the // parent (CONSUMER receive) span is linked singleRecordLinkExtractor.extract(spanLinks, Context.root(), record); diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java similarity index 71% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java rename to instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java index d43954afc8..f1b0c9b08a 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java @@ -8,14 +8,14 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; import org.springframework.kafka.listener.ListenerExecutionFailedException; -public final class KafkaBatchErrorCauseExtractor implements ErrorCauseExtractor { - private final ErrorCauseExtractor delegate = ErrorCauseExtractor.jdk(); +enum SpringKafkaErrorCauseExtractor implements ErrorCauseExtractor { + INSTANCE; @Override public Throwable extractCause(Throwable error) { if (error instanceof ListenerExecutionFailedException && error.getCause() != null) { error = error.getCause(); } - return delegate.extractCause(error); + return ErrorCauseExtractor.jdk().extractCause(error); } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java index d5c0597cb6..996f3a24b0 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java @@ -11,6 +11,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -18,10 +19,13 @@ public final class SpringKafkaSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; - private static final Instrumenter, Void> PROCESS_INSTRUMENTER = - buildProcessInstrumenter(); + private static final Instrumenter, Void> BATCH_PROCESS_INSTRUMENTER = + buildBatchProcessInstrumenter(); + private static final Instrumenter, Void> PROCESS_INSTRUMENTER = + KafkaInstrumenterFactory.createConsumerProcessInstrumenter( + INSTRUMENTATION_NAME, SpringKafkaErrorCauseExtractor.INSTANCE); - private static Instrumenter, Void> buildProcessInstrumenter() { + private static Instrumenter, Void> buildBatchProcessInstrumenter() { KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.PROCESS; @@ -32,15 +36,15 @@ public final class SpringKafkaSingletons { .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) .addSpanLinksExtractor( new KafkaBatchProcessSpanLinksExtractor(GlobalOpenTelemetry.getPropagators())) - .setErrorCauseExtractor(new KafkaBatchErrorCauseExtractor()) + .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE) .newInstrumenter(SpanKindExtractor.alwaysConsumer()); } - public static Instrumenter, Void> receiveInstrumenter() { - return null; + public static Instrumenter, Void> batchProcessInstrumenter() { + return BATCH_PROCESS_INSTRUMENTER; } - public static Instrumenter, Void> processInstrumenter() { + public static Instrumenter, Void> processInstrumenter() { return PROCESS_INSTRUMENTER; } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java index 589526180e..f09bbe61c4 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java @@ -8,17 +8,15 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; import com.google.auto.value.AutoValue; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import org.apache.kafka.clients.consumer.ConsumerRecords; @AutoValue -public abstract class State { +public abstract class State { - public static State create( - ConsumerRecords request, Context context, Scope scope) { + public static State create(REQUEST request, Context context, Scope scope) { return new AutoValue_State<>(request, context, scope); } - public abstract ConsumerRecords request(); + public abstract REQUEST request(); public abstract Context context(); diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SuppressingKafkaClientsInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SuppressingKafkaClientsInstrumentation.java index f864359938..cf8d85b2bb 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SuppressingKafkaClientsInstrumentation.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SuppressingKafkaClientsInstrumentation.java @@ -31,13 +31,13 @@ public class SuppressingKafkaClientsInstrumentation implements TypeInstrumentati @SuppressWarnings("unused") public static class RunLoopAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter() { - KafkaClientsConsumerProcessTracing.disableWrapping(); + public static boolean onEnter() { + return KafkaClientsConsumerProcessTracing.setEnabled(false); } @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit() { - KafkaClientsConsumerProcessTracing.enableWrapping(); + public static void onExit(@Advice.Enter boolean previousValue) { + KafkaClientsConsumerProcessTracing.setEnabled(previousValue); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/BatchRecordListener.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/BatchRecordListener.groovy new file mode 100644 index 0000000000..e64e5f96bd --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/BatchRecordListener.groovy @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.testing.GlobalTraceUtil +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.annotation.KafkaListener + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +class BatchRecordListener { + static AtomicInteger lastBatchSize = new AtomicInteger() + static CountDownLatch messageReceived = new CountDownLatch(2) + + @KafkaListener(id = "testBatchListener", topics = "testBatchTopic", containerFactory = "batchFactory") + void listener(List> records) { + lastBatchSize.set(records.size()) + records.size().times { + messageReceived.countDown() + } + + GlobalTraceUtil.runWithSpan("consumer") {} + records.forEach({ record -> + if (record.value() == "error") { + throw new IllegalArgumentException("boom") + } + }) + } + + static void reset() { + messageReceived = new CountDownLatch(2) + lastBatchSize.set(0) + } + + static void waitForMessages() { + messageReceived.await(30, TimeUnit.SECONDS) + } + + static int getLastBatchSize() { + return lastBatchSize.get() + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/ConsumerConfig.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/ConsumerConfig.groovy new file mode 100644 index 0000000000..6c13d3c21a --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/ConsumerConfig.groovy @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.apache.kafka.clients.admin.NewTopic +import org.springframework.boot.SpringBootConfiguration +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.annotation.Bean +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.config.TopicBuilder +import org.springframework.kafka.core.ConsumerFactory + +@SpringBootConfiguration +@EnableAutoConfiguration +class ConsumerConfig { + + @Bean + NewTopic batchTopic() { + return TopicBuilder.name("testBatchTopic") + .partitions(1) + .replicas(1) + .build() + } + + @Bean + NewTopic singleTopic() { + return TopicBuilder.name("testSingleTopic") + .partitions(1) + .replicas(1) + .build() + } + + @Bean + BatchRecordListener batchRecordListener() { + return new BatchRecordListener() + } + + @Bean + SingleRecordListener singleRecordListener() { + return new SingleRecordListener() + } + + @Bean + ConcurrentKafkaListenerContainerFactory batchFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>() + // do not retry failed records + factory.setBatchErrorHandler(new DoNothingBatchErrorHandler()) + factory.setConsumerFactory(consumerFactory) + factory.setBatchListener(true) + factory.setAutoStartup(true) + factory + } + + @Bean + ConcurrentKafkaListenerContainerFactory singleFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>() + // do not retry failed records + factory.setErrorHandler(new DoNothingErrorHandler()) + factory.setConsumerFactory(consumerFactory) + factory.setBatchListener(false) + factory.setAutoStartup(true) + factory + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingErrorHandler.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingErrorHandler.groovy new file mode 100644 index 0000000000..f97b025e15 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingErrorHandler.groovy @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.ErrorHandler + +class DoNothingErrorHandler implements ErrorHandler { + @Override + void handle(Exception thrownException, ConsumerRecord data) { + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SingleRecordListener.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SingleRecordListener.groovy new file mode 100644 index 0000000000..a613f89a51 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SingleRecordListener.groovy @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.testing.GlobalTraceUtil +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.annotation.KafkaListener + +class SingleRecordListener { + @KafkaListener(id = "testSingleListener", topics = "testSingleTopic", containerFactory = "singleFactory") + void listener(ConsumerRecord record) { + GlobalTraceUtil.runWithSpan("consumer") {} + if (record.value() == "error") { + throw new IllegalArgumentException("boom") + } + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index 3cef9e6bbc..cd3b79b3af 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -4,29 +4,17 @@ */ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.testing.GlobalTraceUtil import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import java.time.Duration -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.boot.SpringApplication -import org.springframework.boot.SpringBootConfiguration -import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.ConfigurableApplicationContext -import org.springframework.context.annotation.Bean -import org.springframework.kafka.annotation.KafkaListener -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.config.TopicBuilder -import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.KafkaTemplate import org.testcontainers.containers.KafkaContainer import org.testcontainers.containers.wait.strategy.Wait import spock.lang.Shared +import java.time.Duration + import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.PRODUCER import static io.opentelemetry.api.trace.StatusCode.ERROR @@ -73,17 +61,17 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { // a batch. def maxAttempts = 5 for (i in 1..maxAttempts) { - Listener.reset() + BatchRecordListener.reset() runWithSpan("producer") { kafkaTemplate.executeInTransaction({ ops -> - ops.send("testTopic", "10", "testSpan1") - ops.send("testTopic", "20", "testSpan2") + ops.send("testBatchTopic", "10", "testSpan1") + ops.send("testBatchTopic", "20", "testSpan2") }) } - Listener.waitForMessages() - if (Listener.getLastBatchSize() == 2) { + BatchRecordListener.waitForMessages() + if (BatchRecordListener.getLastBatchSize() == 2) { break } else if (i < maxAttempts) { ignoreTracesAndClear(2) @@ -93,7 +81,7 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { then: assertTraces(2) { - traces.sort(orderByRootSpanName("producer", "testTopic receive", "testTopic process")) + traces.sort(orderByRootSpanName("producer", "testBatchTopic receive", "testBatchTopic process")) SpanData producer1, producer2 @@ -102,22 +90,22 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { name "producer" } span(1) { - name "testTopic send" + name "testBatchTopic send" kind PRODUCER childOf span(0) attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" } } span(2) { - name "testTopic send" + name "testBatchTopic send" kind PRODUCER childOf span(0) attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" } } @@ -127,25 +115,25 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } trace(1, 3) { span(0) { - name "testTopic receive" + name "testBatchTopic receive" kind CONSUMER hasNoParent() attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "receive" } } span(1) { - name "testTopic process" + name "testBatchTopic process" kind CONSUMER childOf span(0) hasLink producer1 hasLink producer2 attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "process" } @@ -158,6 +146,144 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } } + def "should handle failure in Kafka batch listener"() { + given: + def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) + + when: + runWithSpan("producer") { + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testBatchTopic", "10", "error") + }) + } + + then: + assertTraces(2) { + traces.sort(orderByRootSpanName("producer", "testBatchTopic receive")) + + SpanData producer + + trace(0, 2) { + span(0) { + name "producer" + } + span(1) { + name "testBatchTopic send" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + } + } + + producer = span(1) + } + trace(1, 3) { + span(0) { + name "testBatchTopic receive" + kind CONSUMER + hasNoParent() + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + span(1) { + name "testBatchTopic process" + kind CONSUMER + childOf span(0) + hasLink producer + status ERROR + errorEvent IllegalArgumentException, "boom" + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" + "$SemanticAttributes.MESSAGING_DESTINATION" "testBatchTopic" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(2) { + name "consumer" + childOf span(1) + } + } + } + } + + def "should create spans for single record receive+process"() { + given: + def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) + + when: + runWithSpan("producer") { + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testSingleTopic", "10", "testSpan") + }) + } + + then: + assertTraces(2) { + traces.sort(orderByRootSpanName("producer", "testSingleTopic receive")) + + SpanData producer + + trace(0, 2) { + span(0) { + name "producer" + } + span(1) { + name "testSingleTopic send" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" + "$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + } + } + + producer = span(1) + } + trace(1, 3) { + span(0) { + name "testSingleTopic receive" + kind CONSUMER + hasNoParent() + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" + "$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + span(1) { + name "testSingleTopic process" + kind CONSUMER + childOf span(0) + hasLink producer + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" + "$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } + "kafka.offset" Long + "kafka.record.queue_time_ms" { it >= 0 } + } + } + span(2) { + name "consumer" + childOf span(1) + } + } + } + } + def "should handle failure in Kafka listener"() { given: def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) @@ -165,13 +291,13 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { when: runWithSpan("producer") { kafkaTemplate.executeInTransaction({ ops -> - ops.send("testTopic", "10", "error") + ops.send("testSingleTopic", "10", "error") }) } then: assertTraces(2) { - traces.sort(orderByRootSpanName("producer", "testTopic receive")) + traces.sort(orderByRootSpanName("producer", "testSingleTopic receive")) SpanData producer @@ -180,12 +306,12 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { name "producer" } span(1) { - name "testTopic send" + name "testSingleTopic send" kind PRODUCER childOf span(0) attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" } } @@ -194,18 +320,18 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } trace(1, 3) { span(0) { - name "testTopic receive" + name "testSingleTopic receive" kind CONSUMER hasNoParent() attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "receive" } } span(1) { - name "testTopic process" + name "testSingleTopic process" kind CONSUMER childOf span(0) hasLink producer @@ -213,9 +339,13 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { errorEvent IllegalArgumentException, "boom" attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" "testTopic" + "$SemanticAttributes.MESSAGING_DESTINATION" "testSingleTopic" "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } + "kafka.offset" Long + "kafka.record.queue_time_ms" { it >= 0 } } } span(2) { @@ -225,71 +355,4 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } } } - - @SpringBootConfiguration - @EnableAutoConfiguration - static class ConsumerConfig { - - @Bean - NewTopic topic() { - return TopicBuilder.name("testTopic") - .partitions(1) - .replicas(1) - .build() - } - - @Bean - Listener listener() { - return new Listener() - } - - @Bean - ConcurrentKafkaListenerContainerFactory batchFactory( - ConsumerFactory consumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>() - // do not retry failed records - factory.setBatchErrorHandler(new DoNothingBatchErrorHandler()) - factory.setConsumerFactory(consumerFactory) - factory.setBatchListener(true) - factory.setAutoStartup(true) - // setting interceptBeforeTx to true eliminates kafka-clients noise - otherwise spans would be created on every ConsumerRecords#iterator() call - factory.setContainerCustomizer({ container -> - container.setInterceptBeforeTx(true) - }) - factory - } - } - - static class Listener { - static AtomicInteger lastBatchSize = new AtomicInteger() - static CountDownLatch messageReceived = new CountDownLatch(2) - - @KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory") - void listener(List> records) { - lastBatchSize.set(records.size()) - records.size().times { - messageReceived.countDown() - } - - GlobalTraceUtil.runWithSpan("consumer") {} - records.forEach({ record -> - if (record.value() == "error") { - throw new IllegalArgumentException("boom") - } - }) - } - - static void reset() { - messageReceived = new CountDownLatch(2) - lastBatchSize.set(0) - } - - static void waitForMessages() { - messageReceived.await(30, TimeUnit.SECONDS) - } - - static int getLastBatchSize() { - return lastBatchSize.get() - } - } }