diff --git a/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AbstractAwsClientInstrumentation.java b/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AbstractAwsClientInstrumentation.java index 654d91dbe9..63e22057e0 100644 --- a/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AbstractAwsClientInstrumentation.java +++ b/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AbstractAwsClientInstrumentation.java @@ -16,7 +16,8 @@ public abstract class AbstractAwsClientInstrumentation extends Instrumenter.Defa "io.opentelemetry.auto.decorator.ClientDecorator", "io.opentelemetry.auto.decorator.HttpClientDecorator", packageName + ".AwsSdkClientDecorator", - packageName + ".TracingExecutionInterceptor" + packageName + ".TracingExecutionInterceptor", + packageName + ".TracingExecutionInterceptor$ScopeHolder" }; } } diff --git a/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AwsHttpClientInstrumentation.java b/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AwsHttpClientInstrumentation.java index 431d58f60d..047161e9f3 100644 --- a/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AwsHttpClientInstrumentation.java +++ b/instrumentation/aws-java-sdk-2.2/src/main/java/io/opentelemetry/auto/instrumentation/aws/v2/AwsHttpClientInstrumentation.java @@ -1,6 +1,6 @@ package io.opentelemetry.auto.instrumentation.aws.v2; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activeScope; +import static io.opentelemetry.auto.instrumentation.aws.v2.TracingExecutionInterceptor.ScopeHolder.CURRENT; import static io.opentelemetry.auto.tooling.ByteBuddyElementMatchers.safeHasSuperType; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -9,8 +9,8 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.not; import com.google.auto.service.AutoService; -import io.opentelemetry.auto.instrumentation.api.AgentScope; import io.opentelemetry.auto.tooling.Instrumenter; +import io.opentelemetry.context.Scope; import java.util.Collections; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -58,8 +58,9 @@ public final class AwsHttpClientInstrumentation extends AbstractAwsClientInstrum @Advice.OnMethodEnter(suppress = Throwable.class) public static boolean methodEnter(@Advice.This final Object thiz) { if (thiz instanceof MakeAsyncHttpRequestStage) { - final AgentScope scope = activeScope(); + final Scope scope = CURRENT.get(); if (scope != null) { + CURRENT.set(null); scope.close(); return true; } @@ -70,8 +71,9 @@ public final class AwsHttpClientInstrumentation extends AbstractAwsClientInstrum @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void methodExit(@Advice.Enter final boolean scopeAlreadyClosed) { if (!scopeAlreadyClosed) { - final AgentScope scope = activeScope(); + final Scope scope = CURRENT.get(); if (scope != null) { + CURRENT.set(null); scope.close(); } } diff --git a/instrumentation/aws-java-sdk-2.2/src/main/java8/io/opentelemetry/auto/instrumentation/aws/v2/AwsSdkClientDecorator.java b/instrumentation/aws-java-sdk-2.2/src/main/java8/io/opentelemetry/auto/instrumentation/aws/v2/AwsSdkClientDecorator.java index 9f33559fe9..91c0c0f698 100644 --- a/instrumentation/aws-java-sdk-2.2/src/main/java8/io/opentelemetry/auto/instrumentation/aws/v2/AwsSdkClientDecorator.java +++ b/instrumentation/aws-java-sdk-2.2/src/main/java8/io/opentelemetry/auto/instrumentation/aws/v2/AwsSdkClientDecorator.java @@ -1,8 +1,10 @@ package io.opentelemetry.auto.instrumentation.aws.v2; -import io.opentelemetry.auto.api.MoreTags; +import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.auto.instrumentation.api.MoreTags; import io.opentelemetry.auto.decorator.HttpClientDecorator; -import io.opentelemetry.auto.instrumentation.api.AgentSpan; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.Tracer; import java.net.URI; import software.amazon.awssdk.awscore.AwsResponse; import software.amazon.awssdk.core.SdkRequest; @@ -15,9 +17,11 @@ import software.amazon.awssdk.http.SdkHttpResponse; public class AwsSdkClientDecorator extends HttpClientDecorator { public static final AwsSdkClientDecorator DECORATE = new AwsSdkClientDecorator(); + public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto"); + static final String COMPONENT_NAME = "java-aws-sdk"; - public AgentSpan onSdkRequest(final AgentSpan span, final SdkRequest request) { + public Span onSdkRequest(final Span span, final SdkRequest request) { // S3 request .getValueForField("Bucket", String.class) @@ -40,7 +44,7 @@ public class AwsSdkClientDecorator extends HttpClientDecorator CURRENT = new ThreadLocal<>(); + } + // Note: it looks like this lambda doesn't get generated as a separate class file so we do not // need to inject helper for it. private static final Consumer OVERRIDE_CONFIGURATION_CONSUMER = builder -> builder.addExecutionInterceptor(new TracingExecutionInterceptor()); - private static final ExecutionAttribute SPAN_ATTRIBUTE = + private static final ExecutionAttribute SPAN_ATTRIBUTE = new ExecutionAttribute<>("io.opentelemetry.auto.Span"); @Override public void beforeExecution( final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) { - final AgentSpan span = startSpan("aws.http"); - try (final AgentScope scope = activateSpan(span, false)) { + final Span span = TRACER.spanBuilder("aws.http").startSpan(); + try (final Scope scope = TRACER.withSpan(span)) { DECORATE.afterStart(span); executionAttributes.putAttribute(SPAN_ATTRIBUTE, span); } @@ -39,7 +42,7 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor { @Override public void afterMarshalling( final Context.AfterMarshalling context, final ExecutionAttributes executionAttributes) { - final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); + final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); DECORATE.onRequest(span, context.httpRequest()); DECORATE.onSdkRequest(span, context.request()); @@ -49,36 +52,36 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor { @Override public void beforeTransmission( final Context.BeforeTransmission context, final ExecutionAttributes executionAttributes) { - final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); + final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); // This scope will be closed by AwsHttpClientInstrumentation since ExecutionInterceptor API // doesn't provide a way to run code in the same thread after transmission has been scheduled. - activateSpan(span, false); + ScopeHolder.CURRENT.set(TRACER.withSpan(span)); } @Override public void afterExecution( final Context.AfterExecution context, final ExecutionAttributes executionAttributes) { - final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); + final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); if (span != null) { executionAttributes.putAttribute(SPAN_ATTRIBUTE, null); // Call onResponse on both types of responses: DECORATE.onResponse(span, context.response()); DECORATE.onResponse(span, context.httpResponse()); DECORATE.beforeFinish(span); - span.finish(); + span.end(); } } @Override public void onExecutionFailure( final Context.FailedExecution context, final ExecutionAttributes executionAttributes) { - final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); + final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); if (span != null) { executionAttributes.putAttribute(SPAN_ATTRIBUTE, null); DECORATE.onError(span, context.exception()); DECORATE.beforeFinish(span); - span.finish(); + span.end(); } } diff --git a/instrumentation/aws-java-sdk-2.2/src/test/groovy/AwsClientTest.groovy b/instrumentation/aws-java-sdk-2.2/src/test/groovy/AwsClientTest.groovy index c70e60c434..57eb0a6333 100644 --- a/instrumentation/aws-java-sdk-2.2/src/test/groovy/AwsClientTest.groovy +++ b/instrumentation/aws-java-sdk-2.2/src/test/groovy/AwsClientTest.groovy @@ -1,5 +1,5 @@ -import io.opentelemetry.auto.api.MoreTags -import io.opentelemetry.auto.api.SpanTypes +import io.opentelemetry.auto.instrumentation.api.MoreTags +import io.opentelemetry.auto.instrumentation.api.SpanTypes import io.opentelemetry.auto.instrumentation.api.Tags import io.opentelemetry.auto.test.AgentTestRunner import software.amazon.awssdk.auth.credentials.AwsBasicCredentials @@ -178,13 +178,6 @@ class AwsClientTest extends AgentTestRunner { expect: response != null - // Order is not guaranteed in these traces, so reorder them if needed to put aws trace first - if (TEST_WRITER[0][0].attributes[MoreTags.SERVICE_NAME].stringValue != "java-aws-sdk") { - def tmp = TEST_WRITER[0] - TEST_WRITER[0] = TEST_WRITER[1] - TEST_WRITER[1] = tmp - } - assertTraces(2) { trace(0, 1) { span(0) { diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java index e0dbf9729c..f45ac98b4b 100644 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java @@ -1,19 +1,18 @@ package io.opentelemetry.auto.instrumentation.kafka_streams; -import io.opentelemetry.auto.api.MoreTags; -import io.opentelemetry.auto.api.SpanTypes; +import io.opentelemetry.OpenTelemetry; import io.opentelemetry.auto.decorator.ClientDecorator; -import io.opentelemetry.auto.instrumentation.api.AgentSpan; +import io.opentelemetry.auto.instrumentation.api.MoreTags; +import io.opentelemetry.auto.instrumentation.api.SpanTypes; import io.opentelemetry.auto.instrumentation.api.Tags; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.Tracer; import org.apache.kafka.streams.processor.internals.StampedRecord; public class KafkaStreamsDecorator extends ClientDecorator { public static final KafkaStreamsDecorator CONSUMER_DECORATE = new KafkaStreamsDecorator(); - @Override - protected String[] instrumentationNames() { - return new String[] {"kafka", "kafka-streams"}; - } + public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto"); @Override protected String service() { @@ -21,7 +20,7 @@ public class KafkaStreamsDecorator extends ClientDecorator { } @Override - protected String component() { + protected String getComponentName() { return "java-kafka"; } @@ -31,11 +30,11 @@ public class KafkaStreamsDecorator extends ClientDecorator { } @Override - protected String spanType() { + protected String getSpanType() { return SpanTypes.MESSAGE_CONSUMER; } - public void onConsume(final AgentSpan span, final StampedRecord record) { + public void onConsume(final Span span, final StampedRecord record) { if (record != null) { final String topic = record.topic() == null ? "kafka" : record.topic(); span.setAttribute(MoreTags.RESOURCE_NAME, "Consume Topic " + topic); diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java index cc5c3bf005..d26e8fb850 100644 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java @@ -1,11 +1,8 @@ package io.opentelemetry.auto.instrumentation.kafka_streams; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activeScope; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activeSpan; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.propagate; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.startSpan; import static io.opentelemetry.auto.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE; +import static io.opentelemetry.auto.instrumentation.kafka_streams.KafkaStreamsDecorator.TRACER; +import static io.opentelemetry.auto.instrumentation.kafka_streams.KafkaStreamsProcessorInstrumentation.SpanScopeHolder.HOLDER; import static io.opentelemetry.auto.instrumentation.kafka_streams.TextMapExtractAdapter.GETTER; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -16,10 +13,10 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; -import io.opentelemetry.auto.instrumentation.api.AgentScope; -import io.opentelemetry.auto.instrumentation.api.AgentSpan; -import io.opentelemetry.auto.instrumentation.api.AgentSpan.Context; +import io.opentelemetry.auto.instrumentation.api.SpanScopePair; import io.opentelemetry.auto.tooling.Instrumenter; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.SpanContext; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -30,10 +27,20 @@ import org.apache.kafka.streams.processor.internals.StampedRecord; public class KafkaStreamsProcessorInstrumentation { // These two instrumentations work together to apply StreamTask.process. // The combination of these is needed because there's no good instrumentation point. - // FIXME: this instrumentation takes somewhat strange approach. It looks like Kafka Streams - // defines notions of 'processor', 'source' and 'sink'. There is no 'consumer' as such. - // Also this instrumentation doesn't define 'producer' making it 'asymmetric' - resulting - // in awkward tests and traces. We may want to revisit this in the future. + + public static class SpanScopeHolder { + public static final ThreadLocal HOLDER = new ThreadLocal<>(); + + private SpanScopePair spanScopePair; + + public SpanScopePair getSpanScopePair() { + return spanScopePair; + } + + public void setSpanScopePair(final SpanScopePair spanScopePair) { + this.spanScopePair = spanScopePair; + } + } @AutoService(Instrumenter.class) public static class StartInstrumentation extends Instrumenter.Default { @@ -53,7 +60,8 @@ public class KafkaStreamsProcessorInstrumentation { "io.opentelemetry.auto.decorator.BaseDecorator", "io.opentelemetry.auto.decorator.ClientDecorator", packageName + ".KafkaStreamsDecorator", - packageName + ".TextMapExtractAdapter" + packageName + ".TextMapExtractAdapter", + KafkaStreamsProcessorInstrumentation.class.getName() + "$SpanScopeHolder" }; } @@ -75,13 +83,26 @@ public class KafkaStreamsProcessorInstrumentation { return; } - final Context extractedContext = propagate().extract(record.value.headers(), GETTER); + final SpanScopeHolder holder = HOLDER.get(); + if (holder == null) { + // somehow nextRecord() was called outside of process() + return; + } - final AgentSpan span = startSpan("kafka.consume", extractedContext); + final Span.Builder spanBuilder = TRACER.spanBuilder("kafka.consume"); + try { + final SpanContext extractedContext = + TRACER.getHttpTextFormat().extract(record.value.headers(), GETTER); + spanBuilder.setParent(extractedContext); + } catch (final IllegalArgumentException e) { + // Couldn't extract a context. We should treat this as a root span. + spanBuilder.setNoParent(); + } + final Span span = spanBuilder.startSpan(); CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onConsume(span, record); - activateSpan(span, true); + holder.setSpanScopePair(new SpanScopePair(span, TRACER.withSpan(span))); } } } @@ -104,7 +125,8 @@ public class KafkaStreamsProcessorInstrumentation { "io.opentelemetry.auto.decorator.BaseDecorator", "io.opentelemetry.auto.decorator.ClientDecorator", packageName + ".KafkaStreamsDecorator", - packageName + ".TextMapExtractAdapter" + packageName + ".TextMapExtractAdapter", + KafkaStreamsProcessorInstrumentation.class.getName() + "$SpanScopeHolder" }; } @@ -117,17 +139,24 @@ public class KafkaStreamsProcessorInstrumentation { public static class StopSpanAdvice { + @Advice.OnMethodEnter + public static SpanScopeHolder onEnter() { + final SpanScopeHolder holder = new SpanScopeHolder(); + HOLDER.set(holder); + return holder; + } + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan(@Advice.Thrown final Throwable throwable) { - // This is dangerous... we assume the span/scope is the one we expect, but it may not be. - final AgentSpan span = activeSpan(); - if (span != null) { + public static void stopSpan( + @Advice.Enter final SpanScopeHolder holder, @Advice.Thrown final Throwable throwable) { + HOLDER.remove(); + final SpanScopePair spanScopePair = holder.getSpanScopePair(); + if (spanScopePair != null) { + final Span span = spanScopePair.getSpan(); CONSUMER_DECORATE.onError(span, throwable); CONSUMER_DECORATE.beforeFinish(span); - } - final AgentScope scope = activeScope(); - if (scope != null) { - scope.close(); + span.end(); + spanScopePair.getScope().close(); } } } diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/TextMapExtractAdapter.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/TextMapExtractAdapter.java index d431072722..e0f04888e5 100644 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/TextMapExtractAdapter.java +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/TextMapExtractAdapter.java @@ -1,25 +1,14 @@ package io.opentelemetry.auto.instrumentation.kafka_streams; -import io.opentelemetry.auto.instrumentation.api.AgentPropagation; +import io.opentelemetry.context.propagation.HttpTextFormat; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; -public class TextMapExtractAdapter implements AgentPropagation.Getter { +public class TextMapExtractAdapter implements HttpTextFormat.Getter { public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); - @Override - public Iterable keys(final Headers headers) { - final List keys = new ArrayList<>(); - for (final Header header : headers) { - keys.add(header.key()); - } - return keys; - } - @Override public String get(final Headers headers, final String key) { final Header header = headers.lastHeader(key); diff --git a/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy index 8ea57afdfa..2279298763 100644 --- a/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy @@ -1,9 +1,9 @@ -import io.opentelemetry.auto.api.MoreTags +import io.opentelemetry.auto.instrumentation.api.MoreTags import io.opentelemetry.auto.instrumentation.api.Tags -import io.opentelemetry.auto.shaded.io.opentelemetry.context.propagation.HttpTextFormat -import io.opentelemetry.auto.shaded.io.opentelemetry.trace.SpanContext -import io.opentelemetry.auto.shaded.io.opentelemetry.trace.propagation.HttpTraceContext import io.opentelemetry.auto.test.AgentTestRunner +import io.opentelemetry.context.propagation.HttpTextFormat +import io.opentelemetry.trace.SpanContext +import io.opentelemetry.trace.propagation.HttpTraceContext import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KafkaStreams @@ -60,10 +60,7 @@ class KafkaStreamsTest extends AgentTestRunner { consumerContainer.setupMessageListener(new MessageListener() { @Override void onMessage(ConsumerRecord record) { - // ensure consistent ordering of traces - // this is the last processing step so we should see 2 traces here - TEST_WRITER.waitForTraces(3) - getTestTracer().activeSpan().setAttribute("testing", 123) + getTestTracer().getCurrentSpan().setAttribute("testing", 123) records.add(record) } }) @@ -87,8 +84,7 @@ class KafkaStreamsTest extends AgentTestRunner { .mapValues(new ValueMapper() { @Override String apply(String textLine) { - TEST_WRITER.waitForTraces(2) // ensure consistent ordering of traces - getTestTracer().activeSpan().setAttribute("asdf", "testing") + getTestTracer().getCurrentSpan().setAttribute("asdf", "testing") return textLine.toLowerCase() } }) @@ -120,15 +116,8 @@ class KafkaStreamsTest extends AgentTestRunner { received.value() == greeting.toLowerCase() received.key() == null - if (TEST_WRITER[1][0].name == "kafka.produce") { - // Make sure that order of first two traces is predetermined. - // Unfortunately it looks like we cannot really control it in a better way through the code - def tmp = TEST_WRITER[1][0] - TEST_WRITER[1][0] = TEST_WRITER[0][0] - TEST_WRITER[0][0] = tmp - } - assertTraces(4) { - trace(0, 1) { + assertTraces(1) { + trace(0, 5) { // PRODUCER span 0 span(0) { operationName "kafka.produce" @@ -142,13 +131,11 @@ class KafkaStreamsTest extends AgentTestRunner { "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER } } - } - trace(1, 1) { // CONSUMER span 0 - span(0) { + span(1) { operationName "kafka.consume" errored false - childOf TEST_WRITER[0][0] + childOf span(0) tags { "$MoreTags.SERVICE_NAME" "kafka" "$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PENDING" @@ -159,29 +146,11 @@ class KafkaStreamsTest extends AgentTestRunner { "offset" 0 } } - } - trace(2, 2) { - - // STREAMING span 0 - span(0) { - operationName "kafka.produce" - errored false - childOf span(1) - - tags { - "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PROCESSED" - "$MoreTags.SPAN_TYPE" "queue" - "$Tags.COMPONENT" "java-kafka" - "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER - } - } - // STREAMING span 1 - span(1) { + span(2) { operationName "kafka.consume" errored false - childOf TEST_WRITER[0][0] + childOf span(0) tags { "$MoreTags.SERVICE_NAME" "kafka" @@ -194,13 +163,25 @@ class KafkaStreamsTest extends AgentTestRunner { "asdf" "testing" } } - } - trace(3, 1) { + // STREAMING span 0 + span(3) { + operationName "kafka.produce" + errored false + childOf span(2) + + tags { + "$MoreTags.SERVICE_NAME" "kafka" + "$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PROCESSED" + "$MoreTags.SPAN_TYPE" "queue" + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + } + } // CONSUMER span 0 - span(0) { + span(4) { operationName "kafka.consume" errored false - childOf TEST_WRITER[2][0] + childOf span(3) tags { "$MoreTags.SERVICE_NAME" "kafka" "$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PROCESSED" @@ -218,14 +199,17 @@ class KafkaStreamsTest extends AgentTestRunner { def headers = received.headers() headers.iterator().hasNext() def traceparent = new String(headers.headers("traceparent").iterator().next().value()) - SpanContext spanContext = new HttpTraceContext().extract(traceparent, new HttpTextFormat.Getter() { + SpanContext spanContext = new HttpTraceContext().extract("", new HttpTextFormat.Getter() { @Override String get(String carrier, String key) { - return carrier + if (key == "traceparent") { + return traceparent + } + return null } }) - spanContext.traceId == TEST_WRITER[2][0].traceId - spanContext.spanId == TEST_WRITER[2][0].spanId + spanContext.traceId == TEST_WRITER.traces[0][3].traceId + spanContext.spanId == TEST_WRITER.traces[0][3].spanId cleanup: diff --git a/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index 639f4828cf..9f9a0d6efa 100644 --- a/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -1,7 +1,6 @@ package io.opentelemetry.auto.instrumentation.rabbitmq.amqp; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan; -import static io.opentelemetry.auto.instrumentation.api.AgentTracer.noopSpan; +import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_SPAN; import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE; import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.PRODUCER_DECORATE; @@ -28,13 +27,13 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.MessageProperties; -import io.opentelemetry.auto.api.MoreTags; import io.opentelemetry.auto.bootstrap.CallDepthThreadLocalMap; -import io.opentelemetry.auto.instrumentation.api.AgentScope; +import io.opentelemetry.auto.instrumentation.api.MoreTags; import io.opentelemetry.auto.instrumentation.api.SpanScopePair; import io.opentelemetry.auto.instrumentation.api.Tags; import io.opentelemetry.auto.tooling.Instrumenter; import io.opentelemetry.context.Scope; +import io.opentelemetry.trace.DefaultSpan; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.SpanContext; import java.io.IOException; @@ -70,6 +69,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { packageName + ".TextMapInjectAdapter", packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", + RabbitCommandInstrumentation.class.getName() + "$SpanHolder", }; } @@ -125,6 +125,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { span.setAttribute(Tags.PEER_PORT, connection.getPort()); DECORATE.afterStart(span); DECORATE.onPeerConnection(span, connection.getAddress()); + CURRENT_RABBIT_SPAN.set(span); return new SpanScopePair(span, TRACER.withSpan(span)); } @@ -134,6 +135,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { if (spanScopePair == null) { return; } + CURRENT_RABBIT_SPAN.remove(); final Span span = spanScopePair.getSpan(); DECORATE.onError(span, throwable); DECORATE.beforeFinish(span); @@ -195,12 +197,12 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { public static class ChannelGetAdvice { @Advice.OnMethodEnter public static long takeTimestamp( - @Advice.Local("placeholderScope") AgentScope placeholderScope, + @Advice.Local("placeholderScope") Scope placeholderScope, @Advice.Local("callDepth") int callDepth) { callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); // Don't want RabbitCommandInstrumentation to mess up our actual parent span. - placeholderScope = activateSpan(noopSpan(), false); + placeholderScope = TRACER.withSpan(DefaultSpan.getInvalid()); return System.currentTimeMillis(); } @@ -209,7 +211,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { @Advice.This final Channel channel, @Advice.Argument(0) final String queue, @Advice.Enter final long startTime, - @Advice.Local("placeholderScope") final AgentScope placeholderScope, + @Advice.Local("placeholderScope") final Scope placeholderScope, @Advice.Local("callDepth") final int callDepth, @Advice.Return final GetResponse response, @Advice.Thrown final Throwable throwable) { diff --git a/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java index f9cd1fdd46..07865261bc 100644 --- a/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java +++ b/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -1,7 +1,7 @@ package io.opentelemetry.auto.instrumentation.rabbitmq.amqp; +import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_SPAN; import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; -import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.TRACER; import static io.opentelemetry.auto.tooling.ByteBuddyElementMatchers.safeHasSuperType; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; @@ -42,6 +42,7 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { // These are only used by muzzleCheck: packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", + RabbitCommandInstrumentation.class.getName() + "$SpanHolder" }; } @@ -52,15 +53,17 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { RabbitCommandInstrumentation.class.getName() + "$CommandConstructorAdvice"); } + public static class SpanHolder { + public static final ThreadLocal CURRENT_RABBIT_SPAN = new ThreadLocal<>(); + } + public static class CommandConstructorAdvice { @Advice.OnMethodExit public static void setResourceNameAddHeaders(@Advice.This final Command command) { - final Span span = TRACER.getCurrentSpan(); - if (span.getContext().isValid() && command.getMethod() != null) { - if (span.getSpanName().equals("amqp.command")) { - DECORATE.onCommand(span, command); - } + final Span span = CURRENT_RABBIT_SPAN.get(); + if (span != null && command.getMethod() != null) { + DECORATE.onCommand(span, command); } } diff --git a/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitDecorator.java index 423218ed91..6ed3c5c7c7 100644 --- a/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitDecorator.java +++ b/instrumentation/rabbitmq-amqp-2.7/src/main/java/io/opentelemetry/auto/instrumentation/rabbitmq/amqp/RabbitDecorator.java @@ -3,9 +3,9 @@ package io.opentelemetry.auto.instrumentation.rabbitmq.amqp; import com.rabbitmq.client.Command; import com.rabbitmq.client.Envelope; import io.opentelemetry.OpenTelemetry; -import io.opentelemetry.auto.api.MoreTags; -import io.opentelemetry.auto.api.SpanTypes; import io.opentelemetry.auto.decorator.ClientDecorator; +import io.opentelemetry.auto.instrumentation.api.MoreTags; +import io.opentelemetry.auto.instrumentation.api.SpanTypes; import io.opentelemetry.auto.instrumentation.api.Tags; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.Tracer; @@ -22,7 +22,7 @@ public class RabbitDecorator extends ClientDecorator { } @Override - protected String spanType() { + protected String getSpanType() { return SpanTypes.MESSAGE_PRODUCER; } }; @@ -35,25 +35,20 @@ public class RabbitDecorator extends ClientDecorator { } @Override - protected String spanType() { + protected String getSpanType() { return SpanTypes.MESSAGE_CONSUMER; } }; public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto"); - @Override - protected String[] instrumentationNames() { - return new String[] {"amqp", "rabbitmq"}; - } - @Override protected String service() { return "rabbitmq"; } @Override - protected String component() { + protected String getComponentName() { return "rabbitmq-amqp"; } @@ -63,7 +58,7 @@ public class RabbitDecorator extends ClientDecorator { } @Override - protected String spanType() { + protected String getSpanType() { return SpanTypes.MESSAGE_CLIENT; } diff --git a/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 89f74c15df..40644b43d0 100644 --- a/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -8,8 +8,8 @@ import com.rabbitmq.client.DefaultConsumer import com.rabbitmq.client.Envelope import com.rabbitmq.client.GetResponse import com.rabbitmq.client.ShutdownSignalException -import io.opentelemetry.auto.api.MoreTags -import io.opentelemetry.auto.api.SpanTypes +import io.opentelemetry.auto.instrumentation.api.MoreTags +import io.opentelemetry.auto.instrumentation.api.SpanTypes import io.opentelemetry.auto.instrumentation.api.Tags import io.opentelemetry.auto.test.AgentTestRunner import io.opentelemetry.auto.test.asserts.TraceAssert diff --git a/settings.gradle b/settings.gradle index 2052cd5a87..5c9f6977cd 100644 --- a/settings.gradle +++ b/settings.gradle @@ -42,8 +42,7 @@ include ':instrumentation:akka-http-10.0' include ':instrumentation:apache-httpasyncclient-4' include ':instrumentation:apache-httpclient-4' include ':instrumentation:aws-java-sdk-1.11.0' -// FIXME this instrumentation relied on activeScope -// include ':instrumentation:aws-java-sdk-2.2' +include ':instrumentation:aws-java-sdk-2.2' include ':instrumentation:cdi-1.2' include ':instrumentation:couchbase-2.0' include ':instrumentation:couchbase-2.6' @@ -89,8 +88,7 @@ include ':instrumentation:jetty-8' include ':instrumentation:jms' include ':instrumentation:jsp-2.3' include ':instrumentation:kafka-clients-0.11' -// FIXME this instrumentation relied on activeScope -// include ':instrumentation:kafka-streams-0.11' +include ':instrumentation:kafka-streams-0.11' include ':instrumentation:lettuce-5' // FIXME this instrumentation relied on scope listener // include ':instrumentation:log4j1' @@ -108,8 +106,7 @@ include ':instrumentation:play-2.6' include ':instrumentation:play-ws-1' include ':instrumentation:play-ws-2' include ':instrumentation:play-ws-2.1' -// FIXME this instrumentation relied on getting current span name -// include ':instrumentation:rabbitmq-amqp-2.7' +include ':instrumentation:rabbitmq-amqp-2.7' include ':instrumentation:ratpack-1.4' include ':instrumentation:reactor-core-3.1' include ':instrumentation:rmi'