diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java index 27dc2032e3..d3ad91cc3b 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; +import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -12,8 +13,9 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKey; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Iterator; @@ -63,10 +65,13 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation { public static void wrap( @Advice.This ConsumerRecords records, @Advice.Return(readOnly = false) Iterable> iterable) { - if (iterable != null) { - SpanContext receiveSpanContext = - VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); - iterable = TracingIterable.wrap(iterable, receiveSpanContext); + // typically, span key suppression should happen inside the Instrumenter, but receiveContext + // is being used as the parent context for the span instead of the current context + if (iterable != null + && SpanKey.CONSUMER_PROCESS.fromContextOrNull(currentContext()) == null) { + Context receiveContext = + VirtualField.find(ConsumerRecords.class, Context.class).get(records); + iterable = TracingIterable.wrap(iterable, receiveContext); } } } @@ -79,9 +84,9 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation { @Advice.This ConsumerRecords records, @Advice.Return(readOnly = false) List> list) { if (list != null) { - SpanContext receiveSpanContext = - VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); - list = TracingList.wrap(list, receiveSpanContext); + Context receiveContext = + VirtualField.find(ConsumerRecords.class, Context.class).get(records); + list = TracingList.wrap(list, receiveContext); } } } @@ -93,10 +98,13 @@ public class ConsumerRecordsInstrumentation implements TypeInstrumentation { public static void wrap( @Advice.This ConsumerRecords records, @Advice.Return(readOnly = false) Iterator> iterator) { - if (iterator != null) { - SpanContext receiveSpanContext = - VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); - iterator = TracingIterator.wrap(iterator, receiveSpanContext); + // typically, span key suppression should happen inside the Instrumenter, but receiveContext + // is being used as the parent context for the span instead of the current context + if (iterator != null + && SpanKey.CONSUMER_PROCESS.fromContextOrNull(currentContext()) == null) { + Context receiveContext = + VirtualField.find(ConsumerRecords.class, Context.class).get(records); + iterator = TracingIterator.wrap(iterator, receiveContext); } } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index 0f20e21814..e49d30cf2e 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext; -import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -14,7 +13,6 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.instrumentation.kafka.internal.ReceivedRecords; @@ -73,9 +71,9 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation { // context even though the span has ended // this is the suggested behavior according to the spec batch receive scenario: // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving - VirtualField consumerRecordsSpan = - VirtualField.find(ConsumerRecords.class, SpanContext.class); - consumerRecordsSpan.set(records, spanFromContext(context).getSpanContext()); + VirtualField consumerRecordsContext = + VirtualField.find(ConsumerRecords.class, Context.class); + consumerRecordsContext.set(records, context); } } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java index 980c43c5d4..4f0478f012 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper; import java.util.Iterator; @@ -16,19 +16,19 @@ public class TracingIterable implements Iterable>, KafkaClientsConsumerProcessWrapper>> { private final Iterable> delegate; - @Nullable private final SpanContext receiveSpanContext; + @Nullable private final Context receiveContext; private boolean firstIterator = true; protected TracingIterable( - Iterable> delegate, @Nullable SpanContext receiveSpanContext) { + Iterable> delegate, @Nullable Context receiveContext) { this.delegate = delegate; - this.receiveSpanContext = receiveSpanContext; + this.receiveContext = receiveContext; } public static Iterable> wrap( - Iterable> delegate, @Nullable SpanContext receiveSpanContext) { + Iterable> delegate, @Nullable Context receiveContext) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterable<>(delegate, receiveSpanContext); + return new TracingIterable<>(delegate, receiveContext); } return delegate; } @@ -40,7 +40,7 @@ public class TracingIterable // However, this is not thread-safe, but usually the first (hopefully only) traversal of // ConsumerRecords is performed in the same thread that called poll() if (firstIterator) { - it = TracingIterator.wrap(delegate.iterator(), receiveSpanContext); + it = TracingIterator.wrap(delegate.iterator(), receiveContext); firstIterator = false; } else { it = delegate.iterator(); diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java index 80c1653297..e00e15bf05 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java @@ -7,8 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerProcessInstrumenter; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; @@ -32,21 +30,17 @@ public class TracingIterator @Nullable private Scope currentScope; private TracingIterator( - Iterator> delegateIterator, @Nullable SpanContext receiveSpanContext) { + Iterator> delegateIterator, @Nullable Context receiveContext) { this.delegateIterator = delegateIterator; - // use the receive CONSUMER span as parent if it's available - Context parentContext = Context.current(); - if (receiveSpanContext != null) { - parentContext = parentContext.with(Span.wrap(receiveSpanContext)); - } - this.parentContext = parentContext; + // use the receive CONSUMER as parent if it's available + this.parentContext = receiveContext != null ? receiveContext : Context.current(); } public static Iterator> wrap( - Iterator> delegateIterator, @Nullable SpanContext receiveSpanContext) { + Iterator> delegateIterator, @Nullable Context receiveContext) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterator<>(delegateIterator, receiveSpanContext); + return new TracingIterator<>(delegateIterator, receiveContext); } return delegateIterator; } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java index 33ed10eb58..97684ce895 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import java.util.Collection; import java.util.List; @@ -16,16 +16,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; public class TracingList extends TracingIterable implements List> { private final List> delegate; - private TracingList( - List> delegate, @Nullable SpanContext receiveSpanContext) { - super(delegate, receiveSpanContext); + private TracingList(List> delegate, @Nullable Context receiveContext) { + super(delegate, receiveContext); this.delegate = delegate; } public static List> wrap( - List> delegate, @Nullable SpanContext receiveSpanContext) { + List> delegate, @Nullable Context receiveContext) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingList<>(delegate, receiveSpanContext); + return new TracingList<>(delegate, receiveContext); } return delegate; } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java index 7527094bc7..b94104b726 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext; -import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.wrapSpan; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -14,7 +13,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -58,13 +56,11 @@ public class PartitionGroupInstrumentation implements TypeInstrumentation { return; } + Context receiveContext = + VirtualField.find(ConsumerRecord.class, Context.class).get(record.value); + // use the receive CONSUMER span as parent if it's available - Context parentContext = currentContext(); - SpanContext receiveSpanContext = - VirtualField.find(ConsumerRecord.class, SpanContext.class).get(record.value); - if (receiveSpanContext != null) { - parentContext = parentContext.with(wrapSpan(receiveSpanContext)); - } + Context parentContext = receiveContext != null ? receiveContext : currentContext(); if (!instrumenter().shouldStart(parentContext, record.value)) { return; diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java index ae05f9ed71..cada146be8 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java @@ -13,7 +13,7 @@ import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -62,9 +62,9 @@ public class RecordDeserializerInstrumentation implements TypeInstrumentation { } // copy the receive CONSUMER span association - VirtualField singleRecordReceiveSpan = - VirtualField.find(ConsumerRecord.class, SpanContext.class); - singleRecordReceiveSpan.set(result, singleRecordReceiveSpan.get(incoming)); + VirtualField singleRecordReceiveContext = + VirtualField.find(ConsumerRecord.class, Context.class); + singleRecordReceiveContext.set(result, singleRecordReceiveContext.get(incoming)); } } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java index b14fd8fb5c..ec85d954d1 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java @@ -11,7 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -57,9 +57,9 @@ public class SourceNodeRecordDeserializerInstrumentation implements TypeInstrume } // copy the receive CONSUMER span association - VirtualField singleRecordReceiveSpan = - VirtualField.find(ConsumerRecord.class, SpanContext.class); - singleRecordReceiveSpan.set(result, singleRecordReceiveSpan.get(incoming)); + VirtualField singleRecordReceiveContext = + VirtualField.find(ConsumerRecord.class, Context.class); + singleRecordReceiveContext.set(result, singleRecordReceiveContext.get(incoming)); } } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java index ebdaaa6d74..65a9f7e9d9 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java @@ -9,7 +9,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -47,17 +47,16 @@ public class StreamThreadInstrumentation implements TypeInstrumentation { return; } - SpanContext receiveSpanContext = - VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records); - if (receiveSpanContext == null) { + Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records); + if (receiveContext == null) { return; } - VirtualField, SpanContext> singleRecordReceiveSpan = - VirtualField.find(ConsumerRecord.class, SpanContext.class); + VirtualField, Context> singleRecordReceiveContext = + VirtualField.find(ConsumerRecord.class, Context.class); for (ConsumerRecord record : records) { - singleRecordReceiveSpan.set(record, receiveSpanContext); + singleRecordReceiveContext.set(record, receiveContext); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java index 00a49657d6..ea88ab5d93 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java @@ -10,7 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.field.VirtualField; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -43,11 +43,11 @@ public class AbstractMessageListenerContainerInstrumentation implements TypeInst @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit(@Advice.Return(readOnly = false) BatchInterceptor interceptor) { if (!(interceptor instanceof InstrumentedBatchInterceptor)) { - VirtualField receiveSpanVirtualField = - VirtualField.find(ConsumerRecords.class, SpanContext.class); + VirtualField receiveContextVirtualField = + VirtualField.find(ConsumerRecords.class, Context.class); VirtualField stateStore = VirtualField.find(ConsumerRecords.class, State.class); interceptor = - new InstrumentedBatchInterceptor<>(receiveSpanVirtualField, stateStore, interceptor); + new InstrumentedBatchInterceptor<>(receiveContextVirtualField, stateStore, interceptor); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java index 37dbb77bae..dca6ba2d10 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java @@ -7,8 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.field.VirtualField; @@ -18,15 +16,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.listener.BatchInterceptor; public final class InstrumentedBatchInterceptor implements BatchInterceptor { - private final VirtualField, SpanContext> receiveSpanVirtualField; + private final VirtualField, Context> receiveContextVirtualField; private final VirtualField, State> stateStore; @Nullable private final BatchInterceptor decorated; public InstrumentedBatchInterceptor( - VirtualField, SpanContext> receiveSpanVirtualField, + VirtualField, Context> receiveContextVirtualField, VirtualField, State> stateStore, @Nullable BatchInterceptor decorated) { - this.receiveSpanVirtualField = receiveSpanVirtualField; + this.receiveContextVirtualField = receiveContextVirtualField; this.stateStore = stateStore; this.decorated = decorated; } @@ -45,13 +43,10 @@ public final class InstrumentedBatchInterceptor implements BatchIntercepto } private Context getParentContext(ConsumerRecords records) { - Context parentContext = Context.current(); + Context receiveContext = receiveContextVirtualField.get(records); + // use the receive CONSUMER span as parent if it's available - SpanContext receiveSpanContext = receiveSpanVirtualField.get(records); - if (receiveSpanContext != null) { - parentContext = parentContext.with(Span.wrap(receiveSpanContext)); - } - return parentContext; + return receiveContext != null ? receiveContext : Context.current(); } @Override diff --git a/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java b/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java index d884c88db4..52ad825f34 100644 --- a/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java +++ b/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.api; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; /** @@ -38,10 +37,5 @@ public final class Java8BytecodeBridge { return Span.fromContext(context); } - /** Calls {@link Span#wrap(SpanContext)}. */ - public static Span wrapSpan(SpanContext spanContext) { - return Span.wrap(spanContext); - } - private Java8BytecodeBridge() {} }