Bring back Kafka Streams, RabbitMQ, and AWS Java SDK 2.2 instrumentation (#109)
* Bring back Kafka Streams instrumentation * Bring back AWS Java SDK 2.2 instrumentation * Bring back RabbitMQ instrumentation * Make kafka streams instrumentation more future proof
This commit is contained in:
parent
bec7775d56
commit
54cb6dc538
|
@ -16,7 +16,8 @@ public abstract class AbstractAwsClientInstrumentation extends Instrumenter.Defa
|
|||
"io.opentelemetry.auto.decorator.ClientDecorator",
|
||||
"io.opentelemetry.auto.decorator.HttpClientDecorator",
|
||||
packageName + ".AwsSdkClientDecorator",
|
||||
packageName + ".TracingExecutionInterceptor"
|
||||
packageName + ".TracingExecutionInterceptor",
|
||||
packageName + ".TracingExecutionInterceptor$ScopeHolder"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package io.opentelemetry.auto.instrumentation.aws.v2;
|
||||
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activeScope;
|
||||
import static io.opentelemetry.auto.instrumentation.aws.v2.TracingExecutionInterceptor.ScopeHolder.CURRENT;
|
||||
import static io.opentelemetry.auto.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
|
@ -9,8 +9,8 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentScope;
|
||||
import io.opentelemetry.auto.tooling.Instrumenter;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
|
@ -58,8 +58,9 @@ public final class AwsHttpClientInstrumentation extends AbstractAwsClientInstrum
|
|||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static boolean methodEnter(@Advice.This final Object thiz) {
|
||||
if (thiz instanceof MakeAsyncHttpRequestStage) {
|
||||
final AgentScope scope = activeScope();
|
||||
final Scope scope = CURRENT.get();
|
||||
if (scope != null) {
|
||||
CURRENT.set(null);
|
||||
scope.close();
|
||||
return true;
|
||||
}
|
||||
|
@ -70,8 +71,9 @@ public final class AwsHttpClientInstrumentation extends AbstractAwsClientInstrum
|
|||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void methodExit(@Advice.Enter final boolean scopeAlreadyClosed) {
|
||||
if (!scopeAlreadyClosed) {
|
||||
final AgentScope scope = activeScope();
|
||||
final Scope scope = CURRENT.get();
|
||||
if (scope != null) {
|
||||
CURRENT.set(null);
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package io.opentelemetry.auto.instrumentation.aws.v2;
|
||||
|
||||
import io.opentelemetry.auto.api.MoreTags;
|
||||
import io.opentelemetry.OpenTelemetry;
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags;
|
||||
import io.opentelemetry.auto.decorator.HttpClientDecorator;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import io.opentelemetry.trace.Tracer;
|
||||
import java.net.URI;
|
||||
import software.amazon.awssdk.awscore.AwsResponse;
|
||||
import software.amazon.awssdk.core.SdkRequest;
|
||||
|
@ -15,9 +17,11 @@ import software.amazon.awssdk.http.SdkHttpResponse;
|
|||
public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, SdkHttpResponse> {
|
||||
public static final AwsSdkClientDecorator DECORATE = new AwsSdkClientDecorator();
|
||||
|
||||
public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto");
|
||||
|
||||
static final String COMPONENT_NAME = "java-aws-sdk";
|
||||
|
||||
public AgentSpan onSdkRequest(final AgentSpan span, final SdkRequest request) {
|
||||
public Span onSdkRequest(final Span span, final SdkRequest request) {
|
||||
// S3
|
||||
request
|
||||
.getValueForField("Bucket", String.class)
|
||||
|
@ -40,7 +44,7 @@ public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, S
|
|||
return span;
|
||||
}
|
||||
|
||||
public AgentSpan onAttributes(final AgentSpan span, final ExecutionAttributes attributes) {
|
||||
public Span onAttributes(final Span span, final ExecutionAttributes attributes) {
|
||||
|
||||
final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
|
||||
final String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
|
||||
|
@ -56,7 +60,7 @@ public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, S
|
|||
}
|
||||
|
||||
// Not overriding the super. Should call both with each type of response.
|
||||
public AgentSpan onResponse(final AgentSpan span, final SdkResponse response) {
|
||||
public Span onResponse(final Span span, final SdkResponse response) {
|
||||
if (response instanceof AwsResponse) {
|
||||
span.setAttribute("aws.requestId", ((AwsResponse) response).responseMetadata().requestId());
|
||||
}
|
||||
|
@ -69,12 +73,7 @@ public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, S
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String[] instrumentationNames() {
|
||||
return new String[] {"aws-sdk"};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String component() {
|
||||
protected String getComponentName() {
|
||||
return COMPONENT_NAME;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
package io.opentelemetry.auto.instrumentation.aws.v2;
|
||||
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.startSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.aws.v2.AwsSdkClientDecorator.DECORATE;
|
||||
import static io.opentelemetry.auto.instrumentation.aws.v2.AwsSdkClientDecorator.TRACER;
|
||||
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentScope;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import java.util.function.Consumer;
|
||||
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
|
||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||
|
@ -17,20 +16,24 @@ import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
|||
/** AWS request execution interceptor */
|
||||
public class TracingExecutionInterceptor implements ExecutionInterceptor {
|
||||
|
||||
public static class ScopeHolder {
|
||||
public static final ThreadLocal<Scope> CURRENT = new ThreadLocal<>();
|
||||
}
|
||||
|
||||
// Note: it looks like this lambda doesn't get generated as a separate class file so we do not
|
||||
// need to inject helper for it.
|
||||
private static final Consumer<ClientOverrideConfiguration.Builder>
|
||||
OVERRIDE_CONFIGURATION_CONSUMER =
|
||||
builder -> builder.addExecutionInterceptor(new TracingExecutionInterceptor());
|
||||
|
||||
private static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
|
||||
private static final ExecutionAttribute<Span> SPAN_ATTRIBUTE =
|
||||
new ExecutionAttribute<>("io.opentelemetry.auto.Span");
|
||||
|
||||
@Override
|
||||
public void beforeExecution(
|
||||
final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) {
|
||||
final AgentSpan span = startSpan("aws.http");
|
||||
try (final AgentScope scope = activateSpan(span, false)) {
|
||||
final Span span = TRACER.spanBuilder("aws.http").startSpan();
|
||||
try (final Scope scope = TRACER.withSpan(span)) {
|
||||
DECORATE.afterStart(span);
|
||||
executionAttributes.putAttribute(SPAN_ATTRIBUTE, span);
|
||||
}
|
||||
|
@ -39,7 +42,7 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {
|
|||
@Override
|
||||
public void afterMarshalling(
|
||||
final Context.AfterMarshalling context, final ExecutionAttributes executionAttributes) {
|
||||
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
|
||||
DECORATE.onRequest(span, context.httpRequest());
|
||||
DECORATE.onSdkRequest(span, context.request());
|
||||
|
@ -49,36 +52,36 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {
|
|||
@Override
|
||||
public void beforeTransmission(
|
||||
final Context.BeforeTransmission context, final ExecutionAttributes executionAttributes) {
|
||||
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
|
||||
// This scope will be closed by AwsHttpClientInstrumentation since ExecutionInterceptor API
|
||||
// doesn't provide a way to run code in the same thread after transmission has been scheduled.
|
||||
activateSpan(span, false);
|
||||
ScopeHolder.CURRENT.set(TRACER.withSpan(span));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterExecution(
|
||||
final Context.AfterExecution context, final ExecutionAttributes executionAttributes) {
|
||||
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
if (span != null) {
|
||||
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
|
||||
// Call onResponse on both types of responses:
|
||||
DECORATE.onResponse(span, context.response());
|
||||
DECORATE.onResponse(span, context.httpResponse());
|
||||
DECORATE.beforeFinish(span);
|
||||
span.finish();
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onExecutionFailure(
|
||||
final Context.FailedExecution context, final ExecutionAttributes executionAttributes) {
|
||||
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
if (span != null) {
|
||||
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
|
||||
DECORATE.onError(span, context.exception());
|
||||
DECORATE.beforeFinish(span);
|
||||
span.finish();
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import io.opentelemetry.auto.api.MoreTags
|
||||
import io.opentelemetry.auto.api.SpanTypes
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags
|
||||
import io.opentelemetry.auto.instrumentation.api.SpanTypes
|
||||
import io.opentelemetry.auto.instrumentation.api.Tags
|
||||
import io.opentelemetry.auto.test.AgentTestRunner
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
|
||||
|
@ -178,13 +178,6 @@ class AwsClientTest extends AgentTestRunner {
|
|||
expect:
|
||||
response != null
|
||||
|
||||
// Order is not guaranteed in these traces, so reorder them if needed to put aws trace first
|
||||
if (TEST_WRITER[0][0].attributes[MoreTags.SERVICE_NAME].stringValue != "java-aws-sdk") {
|
||||
def tmp = TEST_WRITER[0]
|
||||
TEST_WRITER[0] = TEST_WRITER[1]
|
||||
TEST_WRITER[1] = tmp
|
||||
}
|
||||
|
||||
assertTraces(2) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
|
|
|
@ -1,19 +1,18 @@
|
|||
package io.opentelemetry.auto.instrumentation.kafka_streams;
|
||||
|
||||
import io.opentelemetry.auto.api.MoreTags;
|
||||
import io.opentelemetry.auto.api.SpanTypes;
|
||||
import io.opentelemetry.OpenTelemetry;
|
||||
import io.opentelemetry.auto.decorator.ClientDecorator;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags;
|
||||
import io.opentelemetry.auto.instrumentation.api.SpanTypes;
|
||||
import io.opentelemetry.auto.instrumentation.api.Tags;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import io.opentelemetry.trace.Tracer;
|
||||
import org.apache.kafka.streams.processor.internals.StampedRecord;
|
||||
|
||||
public class KafkaStreamsDecorator extends ClientDecorator {
|
||||
public static final KafkaStreamsDecorator CONSUMER_DECORATE = new KafkaStreamsDecorator();
|
||||
|
||||
@Override
|
||||
protected String[] instrumentationNames() {
|
||||
return new String[] {"kafka", "kafka-streams"};
|
||||
}
|
||||
public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto");
|
||||
|
||||
@Override
|
||||
protected String service() {
|
||||
|
@ -21,7 +20,7 @@ public class KafkaStreamsDecorator extends ClientDecorator {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String component() {
|
||||
protected String getComponentName() {
|
||||
return "java-kafka";
|
||||
}
|
||||
|
||||
|
@ -31,11 +30,11 @@ public class KafkaStreamsDecorator extends ClientDecorator {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String spanType() {
|
||||
protected String getSpanType() {
|
||||
return SpanTypes.MESSAGE_CONSUMER;
|
||||
}
|
||||
|
||||
public void onConsume(final AgentSpan span, final StampedRecord record) {
|
||||
public void onConsume(final Span span, final StampedRecord record) {
|
||||
if (record != null) {
|
||||
final String topic = record.topic() == null ? "kafka" : record.topic();
|
||||
span.setAttribute(MoreTags.RESOURCE_NAME, "Consume Topic " + topic);
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
package io.opentelemetry.auto.instrumentation.kafka_streams;
|
||||
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activeScope;
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activeSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.propagate;
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.startSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE;
|
||||
import static io.opentelemetry.auto.instrumentation.kafka_streams.KafkaStreamsDecorator.TRACER;
|
||||
import static io.opentelemetry.auto.instrumentation.kafka_streams.KafkaStreamsProcessorInstrumentation.SpanScopeHolder.HOLDER;
|
||||
import static io.opentelemetry.auto.instrumentation.kafka_streams.TextMapExtractAdapter.GETTER;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
|
@ -16,10 +13,10 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentScope;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentSpan.Context;
|
||||
import io.opentelemetry.auto.instrumentation.api.SpanScopePair;
|
||||
import io.opentelemetry.auto.tooling.Instrumenter;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import io.opentelemetry.trace.SpanContext;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
|
@ -30,10 +27,20 @@ import org.apache.kafka.streams.processor.internals.StampedRecord;
|
|||
public class KafkaStreamsProcessorInstrumentation {
|
||||
// These two instrumentations work together to apply StreamTask.process.
|
||||
// The combination of these is needed because there's no good instrumentation point.
|
||||
// FIXME: this instrumentation takes somewhat strange approach. It looks like Kafka Streams
|
||||
// defines notions of 'processor', 'source' and 'sink'. There is no 'consumer' as such.
|
||||
// Also this instrumentation doesn't define 'producer' making it 'asymmetric' - resulting
|
||||
// in awkward tests and traces. We may want to revisit this in the future.
|
||||
|
||||
public static class SpanScopeHolder {
|
||||
public static final ThreadLocal<SpanScopeHolder> HOLDER = new ThreadLocal<>();
|
||||
|
||||
private SpanScopePair spanScopePair;
|
||||
|
||||
public SpanScopePair getSpanScopePair() {
|
||||
return spanScopePair;
|
||||
}
|
||||
|
||||
public void setSpanScopePair(final SpanScopePair spanScopePair) {
|
||||
this.spanScopePair = spanScopePair;
|
||||
}
|
||||
}
|
||||
|
||||
@AutoService(Instrumenter.class)
|
||||
public static class StartInstrumentation extends Instrumenter.Default {
|
||||
|
@ -53,7 +60,8 @@ public class KafkaStreamsProcessorInstrumentation {
|
|||
"io.opentelemetry.auto.decorator.BaseDecorator",
|
||||
"io.opentelemetry.auto.decorator.ClientDecorator",
|
||||
packageName + ".KafkaStreamsDecorator",
|
||||
packageName + ".TextMapExtractAdapter"
|
||||
packageName + ".TextMapExtractAdapter",
|
||||
KafkaStreamsProcessorInstrumentation.class.getName() + "$SpanScopeHolder"
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -75,13 +83,26 @@ public class KafkaStreamsProcessorInstrumentation {
|
|||
return;
|
||||
}
|
||||
|
||||
final Context extractedContext = propagate().extract(record.value.headers(), GETTER);
|
||||
final SpanScopeHolder holder = HOLDER.get();
|
||||
if (holder == null) {
|
||||
// somehow nextRecord() was called outside of process()
|
||||
return;
|
||||
}
|
||||
|
||||
final AgentSpan span = startSpan("kafka.consume", extractedContext);
|
||||
final Span.Builder spanBuilder = TRACER.spanBuilder("kafka.consume");
|
||||
try {
|
||||
final SpanContext extractedContext =
|
||||
TRACER.getHttpTextFormat().extract(record.value.headers(), GETTER);
|
||||
spanBuilder.setParent(extractedContext);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
// Couldn't extract a context. We should treat this as a root span.
|
||||
spanBuilder.setNoParent();
|
||||
}
|
||||
final Span span = spanBuilder.startSpan();
|
||||
CONSUMER_DECORATE.afterStart(span);
|
||||
CONSUMER_DECORATE.onConsume(span, record);
|
||||
|
||||
activateSpan(span, true);
|
||||
holder.setSpanScopePair(new SpanScopePair(span, TRACER.withSpan(span)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +125,8 @@ public class KafkaStreamsProcessorInstrumentation {
|
|||
"io.opentelemetry.auto.decorator.BaseDecorator",
|
||||
"io.opentelemetry.auto.decorator.ClientDecorator",
|
||||
packageName + ".KafkaStreamsDecorator",
|
||||
packageName + ".TextMapExtractAdapter"
|
||||
packageName + ".TextMapExtractAdapter",
|
||||
KafkaStreamsProcessorInstrumentation.class.getName() + "$SpanScopeHolder"
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -117,17 +139,24 @@ public class KafkaStreamsProcessorInstrumentation {
|
|||
|
||||
public static class StopSpanAdvice {
|
||||
|
||||
@Advice.OnMethodEnter
|
||||
public static SpanScopeHolder onEnter() {
|
||||
final SpanScopeHolder holder = new SpanScopeHolder();
|
||||
HOLDER.set(holder);
|
||||
return holder;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(@Advice.Thrown final Throwable throwable) {
|
||||
// This is dangerous... we assume the span/scope is the one we expect, but it may not be.
|
||||
final AgentSpan span = activeSpan();
|
||||
if (span != null) {
|
||||
public static void stopSpan(
|
||||
@Advice.Enter final SpanScopeHolder holder, @Advice.Thrown final Throwable throwable) {
|
||||
HOLDER.remove();
|
||||
final SpanScopePair spanScopePair = holder.getSpanScopePair();
|
||||
if (spanScopePair != null) {
|
||||
final Span span = spanScopePair.getSpan();
|
||||
CONSUMER_DECORATE.onError(span, throwable);
|
||||
CONSUMER_DECORATE.beforeFinish(span);
|
||||
}
|
||||
final AgentScope scope = activeScope();
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
span.end();
|
||||
spanScopePair.getScope().close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,25 +1,14 @@
|
|||
package io.opentelemetry.auto.instrumentation.kafka_streams;
|
||||
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentPropagation;
|
||||
import io.opentelemetry.context.propagation.HttpTextFormat;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
public class TextMapExtractAdapter implements AgentPropagation.Getter<Headers> {
|
||||
public class TextMapExtractAdapter implements HttpTextFormat.Getter<Headers> {
|
||||
|
||||
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
|
||||
|
||||
@Override
|
||||
public Iterable<String> keys(final Headers headers) {
|
||||
final List<String> keys = new ArrayList<>();
|
||||
for (final Header header : headers) {
|
||||
keys.add(header.key());
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(final Headers headers, final String key) {
|
||||
final Header header = headers.lastHeader(key);
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import io.opentelemetry.auto.api.MoreTags
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags
|
||||
import io.opentelemetry.auto.instrumentation.api.Tags
|
||||
import io.opentelemetry.auto.shaded.io.opentelemetry.context.propagation.HttpTextFormat
|
||||
import io.opentelemetry.auto.shaded.io.opentelemetry.trace.SpanContext
|
||||
import io.opentelemetry.auto.shaded.io.opentelemetry.trace.propagation.HttpTraceContext
|
||||
import io.opentelemetry.auto.test.AgentTestRunner
|
||||
import io.opentelemetry.context.propagation.HttpTextFormat
|
||||
import io.opentelemetry.trace.SpanContext
|
||||
import io.opentelemetry.trace.propagation.HttpTraceContext
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.apache.kafka.common.serialization.Serdes
|
||||
import org.apache.kafka.streams.KafkaStreams
|
||||
|
@ -60,10 +60,7 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
consumerContainer.setupMessageListener(new MessageListener<String, String>() {
|
||||
@Override
|
||||
void onMessage(ConsumerRecord<String, String> record) {
|
||||
// ensure consistent ordering of traces
|
||||
// this is the last processing step so we should see 2 traces here
|
||||
TEST_WRITER.waitForTraces(3)
|
||||
getTestTracer().activeSpan().setAttribute("testing", 123)
|
||||
getTestTracer().getCurrentSpan().setAttribute("testing", 123)
|
||||
records.add(record)
|
||||
}
|
||||
})
|
||||
|
@ -87,8 +84,7 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
.mapValues(new ValueMapper<String, String>() {
|
||||
@Override
|
||||
String apply(String textLine) {
|
||||
TEST_WRITER.waitForTraces(2) // ensure consistent ordering of traces
|
||||
getTestTracer().activeSpan().setAttribute("asdf", "testing")
|
||||
getTestTracer().getCurrentSpan().setAttribute("asdf", "testing")
|
||||
return textLine.toLowerCase()
|
||||
}
|
||||
})
|
||||
|
@ -120,15 +116,8 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
received.value() == greeting.toLowerCase()
|
||||
received.key() == null
|
||||
|
||||
if (TEST_WRITER[1][0].name == "kafka.produce") {
|
||||
// Make sure that order of first two traces is predetermined.
|
||||
// Unfortunately it looks like we cannot really control it in a better way through the code
|
||||
def tmp = TEST_WRITER[1][0]
|
||||
TEST_WRITER[1][0] = TEST_WRITER[0][0]
|
||||
TEST_WRITER[0][0] = tmp
|
||||
}
|
||||
assertTraces(4) {
|
||||
trace(0, 1) {
|
||||
assertTraces(1) {
|
||||
trace(0, 5) {
|
||||
// PRODUCER span 0
|
||||
span(0) {
|
||||
operationName "kafka.produce"
|
||||
|
@ -142,13 +131,11 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
// CONSUMER span 0
|
||||
span(0) {
|
||||
span(1) {
|
||||
operationName "kafka.consume"
|
||||
errored false
|
||||
childOf TEST_WRITER[0][0]
|
||||
childOf span(0)
|
||||
tags {
|
||||
"$MoreTags.SERVICE_NAME" "kafka"
|
||||
"$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PENDING"
|
||||
|
@ -159,29 +146,11 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
"offset" 0
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(2, 2) {
|
||||
|
||||
// STREAMING span 0
|
||||
span(0) {
|
||||
operationName "kafka.produce"
|
||||
errored false
|
||||
childOf span(1)
|
||||
|
||||
tags {
|
||||
"$MoreTags.SERVICE_NAME" "kafka"
|
||||
"$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PROCESSED"
|
||||
"$MoreTags.SPAN_TYPE" "queue"
|
||||
"$Tags.COMPONENT" "java-kafka"
|
||||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
|
||||
}
|
||||
}
|
||||
|
||||
// STREAMING span 1
|
||||
span(1) {
|
||||
span(2) {
|
||||
operationName "kafka.consume"
|
||||
errored false
|
||||
childOf TEST_WRITER[0][0]
|
||||
childOf span(0)
|
||||
|
||||
tags {
|
||||
"$MoreTags.SERVICE_NAME" "kafka"
|
||||
|
@ -194,13 +163,25 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
"asdf" "testing"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(3, 1) {
|
||||
// STREAMING span 0
|
||||
span(3) {
|
||||
operationName "kafka.produce"
|
||||
errored false
|
||||
childOf span(2)
|
||||
|
||||
tags {
|
||||
"$MoreTags.SERVICE_NAME" "kafka"
|
||||
"$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PROCESSED"
|
||||
"$MoreTags.SPAN_TYPE" "queue"
|
||||
"$Tags.COMPONENT" "java-kafka"
|
||||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
|
||||
}
|
||||
}
|
||||
// CONSUMER span 0
|
||||
span(0) {
|
||||
span(4) {
|
||||
operationName "kafka.consume"
|
||||
errored false
|
||||
childOf TEST_WRITER[2][0]
|
||||
childOf span(3)
|
||||
tags {
|
||||
"$MoreTags.SERVICE_NAME" "kafka"
|
||||
"$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PROCESSED"
|
||||
|
@ -218,14 +199,17 @@ class KafkaStreamsTest extends AgentTestRunner {
|
|||
def headers = received.headers()
|
||||
headers.iterator().hasNext()
|
||||
def traceparent = new String(headers.headers("traceparent").iterator().next().value())
|
||||
SpanContext spanContext = new HttpTraceContext().extract(traceparent, new HttpTextFormat.Getter<String>() {
|
||||
SpanContext spanContext = new HttpTraceContext().extract("", new HttpTextFormat.Getter<String>() {
|
||||
@Override
|
||||
String get(String carrier, String key) {
|
||||
return carrier
|
||||
if (key == "traceparent") {
|
||||
return traceparent
|
||||
}
|
||||
return null
|
||||
}
|
||||
})
|
||||
spanContext.traceId == TEST_WRITER[2][0].traceId
|
||||
spanContext.spanId == TEST_WRITER[2][0].spanId
|
||||
spanContext.traceId == TEST_WRITER.traces[0][3].traceId
|
||||
spanContext.spanId == TEST_WRITER.traces[0][3].spanId
|
||||
|
||||
|
||||
cleanup:
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package io.opentelemetry.auto.instrumentation.rabbitmq.amqp;
|
||||
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.noopSpan;
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_SPAN;
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE;
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE;
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.PRODUCER_DECORATE;
|
||||
|
@ -28,13 +27,13 @@ import com.rabbitmq.client.Connection;
|
|||
import com.rabbitmq.client.Consumer;
|
||||
import com.rabbitmq.client.GetResponse;
|
||||
import com.rabbitmq.client.MessageProperties;
|
||||
import io.opentelemetry.auto.api.MoreTags;
|
||||
import io.opentelemetry.auto.bootstrap.CallDepthThreadLocalMap;
|
||||
import io.opentelemetry.auto.instrumentation.api.AgentScope;
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags;
|
||||
import io.opentelemetry.auto.instrumentation.api.SpanScopePair;
|
||||
import io.opentelemetry.auto.instrumentation.api.Tags;
|
||||
import io.opentelemetry.auto.tooling.Instrumenter;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.trace.DefaultSpan;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import io.opentelemetry.trace.SpanContext;
|
||||
import java.io.IOException;
|
||||
|
@ -70,6 +69,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
packageName + ".TextMapInjectAdapter",
|
||||
packageName + ".TextMapExtractAdapter",
|
||||
packageName + ".TracedDelegatingConsumer",
|
||||
RabbitCommandInstrumentation.class.getName() + "$SpanHolder",
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -125,6 +125,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
span.setAttribute(Tags.PEER_PORT, connection.getPort());
|
||||
DECORATE.afterStart(span);
|
||||
DECORATE.onPeerConnection(span, connection.getAddress());
|
||||
CURRENT_RABBIT_SPAN.set(span);
|
||||
return new SpanScopePair(span, TRACER.withSpan(span));
|
||||
}
|
||||
|
||||
|
@ -134,6 +135,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
if (spanScopePair == null) {
|
||||
return;
|
||||
}
|
||||
CURRENT_RABBIT_SPAN.remove();
|
||||
final Span span = spanScopePair.getSpan();
|
||||
DECORATE.onError(span, throwable);
|
||||
DECORATE.beforeFinish(span);
|
||||
|
@ -195,12 +197,12 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
public static class ChannelGetAdvice {
|
||||
@Advice.OnMethodEnter
|
||||
public static long takeTimestamp(
|
||||
@Advice.Local("placeholderScope") AgentScope placeholderScope,
|
||||
@Advice.Local("placeholderScope") Scope placeholderScope,
|
||||
@Advice.Local("callDepth") int callDepth) {
|
||||
|
||||
callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class);
|
||||
// Don't want RabbitCommandInstrumentation to mess up our actual parent span.
|
||||
placeholderScope = activateSpan(noopSpan(), false);
|
||||
placeholderScope = TRACER.withSpan(DefaultSpan.getInvalid());
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
@ -209,7 +211,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
@Advice.This final Channel channel,
|
||||
@Advice.Argument(0) final String queue,
|
||||
@Advice.Enter final long startTime,
|
||||
@Advice.Local("placeholderScope") final AgentScope placeholderScope,
|
||||
@Advice.Local("placeholderScope") final Scope placeholderScope,
|
||||
@Advice.Local("callDepth") final int callDepth,
|
||||
@Advice.Return final GetResponse response,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package io.opentelemetry.auto.instrumentation.rabbitmq.amqp;
|
||||
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_SPAN;
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE;
|
||||
import static io.opentelemetry.auto.instrumentation.rabbitmq.amqp.RabbitDecorator.TRACER;
|
||||
import static io.opentelemetry.auto.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
|
||||
|
@ -42,6 +42,7 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default {
|
|||
// These are only used by muzzleCheck:
|
||||
packageName + ".TextMapExtractAdapter",
|
||||
packageName + ".TracedDelegatingConsumer",
|
||||
RabbitCommandInstrumentation.class.getName() + "$SpanHolder"
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -52,15 +53,17 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default {
|
|||
RabbitCommandInstrumentation.class.getName() + "$CommandConstructorAdvice");
|
||||
}
|
||||
|
||||
public static class SpanHolder {
|
||||
public static final ThreadLocal<Span> CURRENT_RABBIT_SPAN = new ThreadLocal<>();
|
||||
}
|
||||
|
||||
public static class CommandConstructorAdvice {
|
||||
@Advice.OnMethodExit
|
||||
public static void setResourceNameAddHeaders(@Advice.This final Command command) {
|
||||
final Span span = TRACER.getCurrentSpan();
|
||||
|
||||
if (span.getContext().isValid() && command.getMethod() != null) {
|
||||
if (span.getSpanName().equals("amqp.command")) {
|
||||
DECORATE.onCommand(span, command);
|
||||
}
|
||||
final Span span = CURRENT_RABBIT_SPAN.get();
|
||||
if (span != null && command.getMethod() != null) {
|
||||
DECORATE.onCommand(span, command);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,9 +3,9 @@ package io.opentelemetry.auto.instrumentation.rabbitmq.amqp;
|
|||
import com.rabbitmq.client.Command;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import io.opentelemetry.OpenTelemetry;
|
||||
import io.opentelemetry.auto.api.MoreTags;
|
||||
import io.opentelemetry.auto.api.SpanTypes;
|
||||
import io.opentelemetry.auto.decorator.ClientDecorator;
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags;
|
||||
import io.opentelemetry.auto.instrumentation.api.SpanTypes;
|
||||
import io.opentelemetry.auto.instrumentation.api.Tags;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import io.opentelemetry.trace.Tracer;
|
||||
|
@ -22,7 +22,7 @@ public class RabbitDecorator extends ClientDecorator {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String spanType() {
|
||||
protected String getSpanType() {
|
||||
return SpanTypes.MESSAGE_PRODUCER;
|
||||
}
|
||||
};
|
||||
|
@ -35,25 +35,20 @@ public class RabbitDecorator extends ClientDecorator {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String spanType() {
|
||||
protected String getSpanType() {
|
||||
return SpanTypes.MESSAGE_CONSUMER;
|
||||
}
|
||||
};
|
||||
|
||||
public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto");
|
||||
|
||||
@Override
|
||||
protected String[] instrumentationNames() {
|
||||
return new String[] {"amqp", "rabbitmq"};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String service() {
|
||||
return "rabbitmq";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String component() {
|
||||
protected String getComponentName() {
|
||||
return "rabbitmq-amqp";
|
||||
}
|
||||
|
||||
|
@ -63,7 +58,7 @@ public class RabbitDecorator extends ClientDecorator {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String spanType() {
|
||||
protected String getSpanType() {
|
||||
return SpanTypes.MESSAGE_CLIENT;
|
||||
}
|
||||
|
||||
|
|
|
@ -8,8 +8,8 @@ import com.rabbitmq.client.DefaultConsumer
|
|||
import com.rabbitmq.client.Envelope
|
||||
import com.rabbitmq.client.GetResponse
|
||||
import com.rabbitmq.client.ShutdownSignalException
|
||||
import io.opentelemetry.auto.api.MoreTags
|
||||
import io.opentelemetry.auto.api.SpanTypes
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags
|
||||
import io.opentelemetry.auto.instrumentation.api.SpanTypes
|
||||
import io.opentelemetry.auto.instrumentation.api.Tags
|
||||
import io.opentelemetry.auto.test.AgentTestRunner
|
||||
import io.opentelemetry.auto.test.asserts.TraceAssert
|
||||
|
|
|
@ -42,8 +42,7 @@ include ':instrumentation:akka-http-10.0'
|
|||
include ':instrumentation:apache-httpasyncclient-4'
|
||||
include ':instrumentation:apache-httpclient-4'
|
||||
include ':instrumentation:aws-java-sdk-1.11.0'
|
||||
// FIXME this instrumentation relied on activeScope
|
||||
// include ':instrumentation:aws-java-sdk-2.2'
|
||||
include ':instrumentation:aws-java-sdk-2.2'
|
||||
include ':instrumentation:cdi-1.2'
|
||||
include ':instrumentation:couchbase-2.0'
|
||||
include ':instrumentation:couchbase-2.6'
|
||||
|
@ -89,8 +88,7 @@ include ':instrumentation:jetty-8'
|
|||
include ':instrumentation:jms'
|
||||
include ':instrumentation:jsp-2.3'
|
||||
include ':instrumentation:kafka-clients-0.11'
|
||||
// FIXME this instrumentation relied on activeScope
|
||||
// include ':instrumentation:kafka-streams-0.11'
|
||||
include ':instrumentation:kafka-streams-0.11'
|
||||
include ':instrumentation:lettuce-5'
|
||||
// FIXME this instrumentation relied on scope listener
|
||||
// include ':instrumentation:log4j1'
|
||||
|
@ -108,8 +106,7 @@ include ':instrumentation:play-2.6'
|
|||
include ':instrumentation:play-ws-1'
|
||||
include ':instrumentation:play-ws-2'
|
||||
include ':instrumentation:play-ws-2.1'
|
||||
// FIXME this instrumentation relied on getting current span name
|
||||
// include ':instrumentation:rabbitmq-amqp-2.7'
|
||||
include ':instrumentation:rabbitmq-amqp-2.7'
|
||||
include ':instrumentation:ratpack-1.4'
|
||||
include ':instrumentation:reactor-core-3.1'
|
||||
include ':instrumentation:rmi'
|
||||
|
|
Loading…
Reference in New Issue