Merge pull request #666 from DataDog/mar-kolya/aws-sdk2
Initial AWS2 SDK instrumentation
This commit is contained in:
commit
b3ad700b34
|
@ -930,7 +930,7 @@ public class FieldBackedProvider implements InstrumentationContextProvider {
|
|||
}
|
||||
|
||||
private static String getContextFieldName(final String keyClassName) {
|
||||
return "__datadogContext" + Utils.converToInnerClassName(keyClassName);
|
||||
return "__datadogContext$" + Utils.converToInnerClassName(keyClassName);
|
||||
}
|
||||
|
||||
private static String getContextGetterName(final String keyClassName) {
|
||||
|
|
|
@ -18,7 +18,7 @@ import com.amazonaws.Request;
|
|||
import com.amazonaws.Response;
|
||||
import com.amazonaws.handlers.HandlerContextKey;
|
||||
import com.amazonaws.handlers.RequestHandler2;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.SpanContext;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.propagation.Format;
|
||||
|
@ -28,7 +28,11 @@ import io.opentracing.tag.Tags;
|
|||
/** Tracing Request Handler */
|
||||
public class TracingRequestHandler extends RequestHandler2 {
|
||||
|
||||
private final HandlerContextKey<Span> contextKey = new HandlerContextKey<>("span");
|
||||
// Note: aws1.x sdk doesn't have any truly async clients so we can store scope in request context
|
||||
// safely.
|
||||
private static final HandlerContextKey<Scope> SCOPE_CONTEXT_KEY =
|
||||
new HandlerContextKey<>("DatadogScope");
|
||||
|
||||
private final SpanContext parentContext; // for Async Client
|
||||
private final Tracer tracer;
|
||||
|
||||
|
@ -64,30 +68,32 @@ public class TracingRequestHandler extends RequestHandler2 {
|
|||
spanBuilder.asChildOf(parentContext);
|
||||
}
|
||||
|
||||
final Span span = spanBuilder.start();
|
||||
SpanDecorator.onRequest(request, span);
|
||||
final Scope scope = spanBuilder.startActive(true);
|
||||
SpanDecorator.onRequest(request, scope.span());
|
||||
|
||||
// We inject headers at aws-client level because aws requests may be signed and adding headers
|
||||
// on http-client level may break signature.
|
||||
tracer.inject(
|
||||
span.context(),
|
||||
scope.span().context(),
|
||||
Format.Builtin.HTTP_HEADERS,
|
||||
new TextMapInjectAdapter(request.getHeaders()));
|
||||
|
||||
request.addHandlerContext(contextKey, span);
|
||||
request.addHandlerContext(SCOPE_CONTEXT_KEY, scope);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void afterResponse(final Request<?> request, final Response<?> response) {
|
||||
final Span span = request.getHandlerContext(contextKey);
|
||||
SpanDecorator.onResponse(response, span);
|
||||
span.finish();
|
||||
final Scope scope = request.getHandlerContext(SCOPE_CONTEXT_KEY);
|
||||
SpanDecorator.onResponse(response, scope.span());
|
||||
scope.close();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void afterError(final Request<?> request, final Response<?> response, final Exception e) {
|
||||
final Span span = request.getHandlerContext(contextKey);
|
||||
SpanDecorator.onError(e, span);
|
||||
span.finish();
|
||||
final Scope scope = request.getHandlerContext(SCOPE_CONTEXT_KEY);
|
||||
SpanDecorator.onError(e, scope.span());
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,27 +73,8 @@ class AWSClientTest extends AgentTestRunner {
|
|||
client.requestHandler2s.size() == handlerCount
|
||||
client.requestHandler2s.get(0).getClass().getSimpleName() == "TracingRequestHandler"
|
||||
|
||||
assertTraces(2) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
operationName "http.request"
|
||||
resourceName "$method /$url"
|
||||
errored false
|
||||
parent() // FIXME: This should be a child of the aws.http call.
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "apache-httpclient"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" "$server.address/$url"
|
||||
"$Tags.PEER_HOSTNAME.key" "localhost"
|
||||
"$Tags.PEER_PORT.key" server.address.port
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
serviceName "java-aws-sdk"
|
||||
operationName "aws.http"
|
||||
|
@ -107,18 +88,34 @@ class AWSClientTest extends AgentTestRunner {
|
|||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
"aws.service" String
|
||||
"aws.service" { it.contains(service) }
|
||||
"aws.endpoint" "$server.address"
|
||||
"aws.operation" "${operation}Request"
|
||||
"aws.agent" "java-aws-sdk"
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
operationName "http.request"
|
||||
resourceName "$method /$url"
|
||||
errored false
|
||||
childOf(span(0))
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "apache-httpclient"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" "$server.address/$url"
|
||||
"$Tags.PEER_HOSTNAME.key" "localhost"
|
||||
"$Tags.PEER_PORT.key" server.address.port
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Not sure why these are children of the aws.http span:
|
||||
server.lastRequest.headers.get("x-datadog-trace-id") == TEST_WRITER[1][0].traceId
|
||||
server.lastRequest.headers.get("x-datadog-parent-id") == TEST_WRITER[1][0].spanId
|
||||
server.lastRequest.headers.get("x-datadog-trace-id") == TEST_WRITER[0][0].traceId
|
||||
server.lastRequest.headers.get("x-datadog-parent-id") == TEST_WRITER[0][0].spanId
|
||||
|
||||
where:
|
||||
service | operation | method | url | handlerCount | call | body | client
|
||||
|
|
|
@ -102,27 +102,8 @@ class AWSClientTest extends AgentTestRunner {
|
|||
client.requestHandler2s.size() == handlerCount
|
||||
client.requestHandler2s.get(0).getClass().getSimpleName() == "TracingRequestHandler"
|
||||
|
||||
assertTraces(2) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
operationName "http.request"
|
||||
resourceName "$method /$url"
|
||||
errored false
|
||||
parent() // FIXME: This should be a child of the aws.http call.
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "apache-httpclient"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" "$server.address/$url"
|
||||
"$Tags.PEER_HOSTNAME.key" "localhost"
|
||||
"$Tags.PEER_PORT.key" server.address.port
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
serviceName "java-aws-sdk"
|
||||
operationName "aws.http"
|
||||
|
@ -136,18 +117,34 @@ class AWSClientTest extends AgentTestRunner {
|
|||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
"aws.service" String
|
||||
"aws.service" { it.contains(service) }
|
||||
"aws.endpoint" "$server.address"
|
||||
"aws.operation" "${operation}Request"
|
||||
"aws.agent" "java-aws-sdk"
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
operationName "http.request"
|
||||
resourceName "$method /$url"
|
||||
errored false
|
||||
childOf(span(0))
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "apache-httpclient"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" "$server.address/$url"
|
||||
"$Tags.PEER_HOSTNAME.key" "localhost"
|
||||
"$Tags.PEER_PORT.key" server.address.port
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Not sure why these are children of the aws.http span:
|
||||
server.lastRequest.headers.get("x-datadog-trace-id") == TEST_WRITER[1][0].traceId
|
||||
server.lastRequest.headers.get("x-datadog-parent-id") == TEST_WRITER[1][0].spanId
|
||||
server.lastRequest.headers.get("x-datadog-trace-id") == TEST_WRITER[0][0].traceId
|
||||
server.lastRequest.headers.get("x-datadog-parent-id") == TEST_WRITER[0][0].spanId
|
||||
|
||||
where:
|
||||
service | operation | method | url | handlerCount | call | body | client
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
ext {
|
||||
minJavaVersionForTests = JavaVersion.VERSION_1_8
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group = "software.amazon.awssdk"
|
||||
module = "aws-core"
|
||||
versions = "[2.2.0,)"
|
||||
assertInverse = true
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
|
||||
apply plugin: 'org.unbroken-dome.test-sets'
|
||||
|
||||
testSets {
|
||||
latestDepTest
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
main_java8 {
|
||||
java.srcDirs "${project.projectDir}/src/main/java8"
|
||||
}
|
||||
}
|
||||
|
||||
compileMain_java8Java {
|
||||
sourceCompatibility = 1.8
|
||||
targetCompatibility = 1.8
|
||||
}
|
||||
|
||||
dependencies {
|
||||
main_java8CompileOnly group: 'software.amazon.awssdk', name: 'aws-core', version: '2.2.0'
|
||||
|
||||
main_java8Compile project(':dd-java-agent:agent-tooling')
|
||||
|
||||
main_java8Compile deps.bytebuddy
|
||||
main_java8Compile deps.opentracing
|
||||
|
||||
compileOnly sourceSets.main_java8.compileClasspath
|
||||
compile sourceSets.main_java8.output
|
||||
|
||||
annotationProcessor deps.autoservice
|
||||
implementation deps.autoservice
|
||||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
|
||||
testCompile project(':dd-java-agent:instrumentation:apache-httpclient-4')
|
||||
// Also include netty instrumentation because it is used by aws async client
|
||||
testCompile project(':dd-java-agent:instrumentation:netty-4.1')
|
||||
// Needed by netty async instrumentation
|
||||
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
|
||||
testCompile group: 'software.amazon.awssdk', name: 's3', version: '2.2.0'
|
||||
testCompile group: 'software.amazon.awssdk', name: 'rds', version: '2.2.0'
|
||||
testCompile group: 'software.amazon.awssdk', name: 'ec2', version: '2.2.0'
|
||||
|
||||
latestDepTestCompile project(':dd-java-agent:testing')
|
||||
latestDepTestCompile project(':dd-java-agent:instrumentation:apache-httpclient-4')
|
||||
latestDepTestCompile project(':dd-java-agent:instrumentation:netty-4.1')
|
||||
latestDepTestCompile project(':dd-java-agent:instrumentation:java-concurrent')
|
||||
latestDepTestCompile group: 'software.amazon.awssdk', name: 's3', version: '+'
|
||||
latestDepTestCompile group: 'software.amazon.awssdk', name: 'rds', version: '+'
|
||||
latestDepTestCompile group: 'software.amazon.awssdk', name: 'ec2', version: '+'
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package datadog.trace.instrumentation.aws.v2;
|
||||
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
|
||||
public abstract class AbstractAwsClientInstrumentation extends Instrumenter.Default {
|
||||
private static final String INSTRUMENTATION_NAME = "aws-sdk";
|
||||
|
||||
public AbstractAwsClientInstrumentation() {
|
||||
super(INSTRUMENTATION_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
AwsClientInstrumentation.class.getPackage().getName() + ".TracingExecutionInterceptor",
|
||||
AwsClientInstrumentation.class.getPackage().getName()
|
||||
+ ".TracingExecutionInterceptor$InjectAdapter"
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package datadog.trace.instrumentation.aws.v2;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
|
||||
|
||||
/** AWS SDK v2 instrumentation */
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class AwsClientInstrumentation extends AbstractAwsClientInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return safeHasSuperType(named("software.amazon.awssdk.core.client.builder.SdkClientBuilder"))
|
||||
.and(not(isInterface()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
return singletonMap(
|
||||
isMethod().and(isPublic()).and(named("build")), AwsBuilderAdvice.class.getName());
|
||||
}
|
||||
|
||||
public static class AwsBuilderAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void methodEnter(@Advice.This final SdkClientBuilder thiz) {
|
||||
thiz.overrideConfiguration(TracingExecutionInterceptor.getOverrideConfigurationConsumer());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
package datadog.trace.instrumentation.aws.v2;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage;
|
||||
|
||||
/**
|
||||
* Separate instrumentation class to close aws request scope right after request has been submitted
|
||||
* for execution for Sync clients.
|
||||
*/
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class AwsHttpClientInstrumentation extends AbstractAwsClientInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return safeHasSuperType(
|
||||
named("software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage")
|
||||
.or(
|
||||
named(
|
||||
"software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage")))
|
||||
.and(not(isInterface()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
return Collections.singletonMap(
|
||||
isMethod().and(isPublic()).and(named("execute")), AwsHttpClientAdvice.class.getName());
|
||||
}
|
||||
|
||||
public static class AwsHttpClientAdvice {
|
||||
|
||||
/**
|
||||
* FIXME: This is a hack to prevent netty instrumentation from messing things up.
|
||||
*
|
||||
* <p>Currently netty instrumentation cannot handle way AWS SDK makes http requests. If AWS SDK
|
||||
* make a netty call with active scope then continuation will be created that would never be
|
||||
* closed preventing whole trace from reporting. This happens because netty switches channels
|
||||
* between connection and request stages and netty instrumentation cannot find continuation
|
||||
* stored in channel attributes.
|
||||
*/
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static boolean methodEnter(@Advice.This final Object thiz) {
|
||||
if (thiz instanceof MakeAsyncHttpRequestStage) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void methodExit(@Advice.Enter final boolean scopeAlreadyClosed) {
|
||||
if (!scopeAlreadyClosed) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is to make muzzle think we need TracingExecutionInterceptor to make sure we do not apply
|
||||
* this instrumentation when TracingExecutionInterceptor would not work.
|
||||
*/
|
||||
public static void muzzleCheck() {
|
||||
TracingExecutionInterceptor.getOverrideConfigurationConsumer();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,183 @@
|
|||
/*
|
||||
* Copyright 2017-2018 The OpenTracing Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package datadog.trace.instrumentation.aws.v2;
|
||||
|
||||
import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||
|
||||
import datadog.trace.api.DDSpanTypes;
|
||||
import datadog.trace.api.DDTags;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.propagation.Format;
|
||||
import io.opentracing.propagation.TextMap;
|
||||
import io.opentracing.tag.Tags;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import software.amazon.awssdk.awscore.AwsResponse;
|
||||
import software.amazon.awssdk.core.SdkResponse;
|
||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||
import software.amazon.awssdk.core.interceptor.Context;
|
||||
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
|
||||
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
|
||||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
||||
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
|
||||
import software.amazon.awssdk.http.SdkHttpRequest;
|
||||
|
||||
/** AWS request execution interceptor */
|
||||
public class TracingExecutionInterceptor implements ExecutionInterceptor {
|
||||
|
||||
private static final TracingExecutionInterceptor INSTANCE = new TracingExecutionInterceptor();
|
||||
// Note: it looks like this lambda doesn't get generated as a separate class file so we do not
|
||||
// need to inject helper for it.
|
||||
private static final Consumer<ClientOverrideConfiguration.Builder>
|
||||
OVERRIDE_CONFIGURATION_CONSUMER = builder -> builder.addExecutionInterceptor(INSTANCE);
|
||||
|
||||
static final String COMPONENT_NAME = "java-aws-sdk";
|
||||
|
||||
private static final ExecutionAttribute<Span> SPAN_ATTRIBUTE =
|
||||
new ExecutionAttribute<>("DatadogSpan");
|
||||
|
||||
@Override
|
||||
public void beforeExecution(
|
||||
final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) {
|
||||
final Tracer tracer = GlobalTracer.get();
|
||||
|
||||
final Tracer.SpanBuilder builder =
|
||||
tracer
|
||||
.buildSpan("aws.command")
|
||||
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
|
||||
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
|
||||
.withTag(DDTags.SERVICE_NAME, COMPONENT_NAME)
|
||||
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT);
|
||||
executionAttributes.putAttribute(SPAN_ATTRIBUTE, builder.start());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterMarshalling(
|
||||
final Context.AfterMarshalling context, final ExecutionAttributes executionAttributes) {
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
final SdkHttpRequest httpRequest = context.httpRequest();
|
||||
|
||||
Tags.HTTP_METHOD.set(span, httpRequest.method().name());
|
||||
|
||||
try {
|
||||
final URI requestUri = httpRequest.getUri();
|
||||
final String uri =
|
||||
new URI(
|
||||
requestUri.getScheme(),
|
||||
null,
|
||||
requestUri.getHost(),
|
||||
requestUri.getPort(),
|
||||
requestUri.getPath(),
|
||||
null,
|
||||
null)
|
||||
.toString();
|
||||
Tags.HTTP_URL.set(span, uri);
|
||||
} catch (final URISyntaxException e) {
|
||||
Tags.HTTP_URL.set(span, "failed-to-parse");
|
||||
}
|
||||
final String awsServiceName =
|
||||
executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
|
||||
final String awsOperation =
|
||||
executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
|
||||
|
||||
// Resource Name has to be set after the HTTP_URL because otherwise decorators overwrite it
|
||||
span.setTag(DDTags.RESOURCE_NAME, awsServiceName + "." + awsOperation);
|
||||
|
||||
span.setTag("aws.agent", COMPONENT_NAME);
|
||||
span.setTag("aws.service", awsServiceName);
|
||||
span.setTag("aws.operation", awsOperation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SdkHttpRequest modifyHttpRequest(
|
||||
final Context.ModifyHttpRequest context, final ExecutionAttributes executionAttributes) {
|
||||
final Tracer tracer = GlobalTracer.get();
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
final SdkHttpRequest.Builder builder = context.httpRequest().toBuilder();
|
||||
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new InjectAdapter(builder));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeTransmission(
|
||||
final Context.BeforeTransmission context, final ExecutionAttributes executionAttributes) {
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
|
||||
// This scope will be closed by AwsHttpClientInstrumentation since ExecutionInterceptor API
|
||||
// doesn't provide a way to run code in the same thread after transmission has been scheduled.
|
||||
final Scope scope = GlobalTracer.get().scopeManager().activate(span, false);
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterExecution(
|
||||
final Context.AfterExecution context, final ExecutionAttributes executionAttributes) {
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
try {
|
||||
Tags.HTTP_STATUS.set(span, context.httpResponse().statusCode());
|
||||
final SdkResponse response = context.response();
|
||||
if (response instanceof AwsResponse) {
|
||||
span.setTag("aws.requestId", ((AwsResponse) response).responseMetadata().requestId());
|
||||
}
|
||||
} finally {
|
||||
span.finish();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onExecutionFailure(
|
||||
final Context.FailedExecution context, final ExecutionAttributes executionAttributes) {
|
||||
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, context.exception()));
|
||||
}
|
||||
|
||||
public static Consumer<ClientOverrideConfiguration.Builder> getOverrideConfigurationConsumer() {
|
||||
return OVERRIDE_CONFIGURATION_CONSUMER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inject headers into the request builder.
|
||||
*
|
||||
* <p>Note: we inject headers at aws-client level because aws requests may be signed and adding
|
||||
* headers on http-client level may break signature.
|
||||
*/
|
||||
public static class InjectAdapter implements TextMap {
|
||||
|
||||
private final SdkHttpRequest.Builder builder;
|
||||
|
||||
public InjectAdapter(final SdkHttpRequest.Builder builder) {
|
||||
this.builder = builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Map.Entry<String, String>> iterator() {
|
||||
throw new UnsupportedOperationException(
|
||||
"This class should be used only with Tracer.extract()!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final String key, final String value) {
|
||||
builder.putHeader(key, value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,225 @@
|
|||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.DDSpanTypes
|
||||
import datadog.trace.api.DDTags
|
||||
import io.opentracing.tag.Tags
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
|
||||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
|
||||
import software.amazon.awssdk.core.async.AsyncResponseTransformer
|
||||
import software.amazon.awssdk.regions.Region
|
||||
import software.amazon.awssdk.services.ec2.Ec2AsyncClient
|
||||
import software.amazon.awssdk.services.ec2.Ec2Client
|
||||
import software.amazon.awssdk.services.rds.RdsAsyncClient
|
||||
import software.amazon.awssdk.services.rds.RdsClient
|
||||
import software.amazon.awssdk.services.rds.model.DeleteOptionGroupRequest
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient
|
||||
import software.amazon.awssdk.services.s3.S3Client
|
||||
import software.amazon.awssdk.services.s3.model.CreateBucketRequest
|
||||
import software.amazon.awssdk.services.s3.model.GetObjectRequest
|
||||
import spock.lang.AutoCleanup
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
|
||||
|
||||
class AwsClientTest extends AgentTestRunner {
|
||||
|
||||
private static final StaticCredentialsProvider CREDENTIALS_PROVIDER = StaticCredentialsProvider
|
||||
.create(AwsBasicCredentials.create("my-access-key", "my-secret-key"))
|
||||
|
||||
@Shared
|
||||
def responseBody = new AtomicReference<String>()
|
||||
|
||||
@AutoCleanup
|
||||
@Shared
|
||||
def server = httpServer {
|
||||
handlers {
|
||||
all {
|
||||
response.status(200).send(responseBody.get())
|
||||
}
|
||||
}
|
||||
}
|
||||
@Shared
|
||||
def serverUrl = new URI("http://localhost:$server.address.port")
|
||||
|
||||
def "send #operation request with builder {#builder.class.getName()} mocked response"() {
|
||||
setup:
|
||||
def client = builder
|
||||
.endpointOverride(serverUrl)
|
||||
.region(Region.AP_NORTHEAST_1)
|
||||
.credentialsProvider(CREDENTIALS_PROVIDER)
|
||||
.build()
|
||||
responseBody.set(body)
|
||||
def response = call.call(client)
|
||||
|
||||
if (response instanceof Future) {
|
||||
response = response.get()
|
||||
}
|
||||
|
||||
expect:
|
||||
response != null
|
||||
|
||||
// It looks like url doesn't contain trailing slash on empty path for some reason
|
||||
def expectedUrl = path == "/" ? "${server.address}" : "${server.address}${path}"
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
serviceName "java-aws-sdk"
|
||||
operationName "aws.http"
|
||||
resourceName "$service.$operation"
|
||||
errored false
|
||||
parent()
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "java-aws-sdk"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" expectedUrl
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
"aws.service" "$service"
|
||||
"aws.operation" "${operation}"
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.requestId" "$requestId"
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
operationName "http.request"
|
||||
resourceName "$method $path"
|
||||
errored false
|
||||
childOf(span(0))
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "apache-httpclient"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" expectedUrl
|
||||
"$Tags.PEER_HOSTNAME.key" "localhost"
|
||||
"$Tags.PEER_PORT.key" server.address.port
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
server.lastRequest.headers.get("x-datadog-trace-id") == TEST_WRITER[0][0].traceId
|
||||
server.lastRequest.headers.get("x-datadog-parent-id") == TEST_WRITER[0][0].spanId
|
||||
|
||||
where:
|
||||
service | operation | method | path | requestId | call | body | builder
|
||||
"S3" | "CreateBucket" | "PUT" | "/testbucket" | "UNKNOWN" | { c -> c.createBucket(CreateBucketRequest.builder().bucket("testbucket").build()) } | "" | S3Client.builder()
|
||||
"S3" | "GetObject" | "GET" | "/someBucket/someKey" | "UNKNOWN" | { c -> c.getObject(GetObjectRequest.builder().bucket("someBucket").key("someKey").build()) } | "" | S3Client.builder()
|
||||
"Ec2" | "AllocateAddress" | "POST" | "/" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | { c -> c.allocateAddress() } | """
|
||||
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
|
||||
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
|
||||
<publicIp>192.0.2.1</publicIp>
|
||||
<domain>standard</domain>
|
||||
</AllocateAddressResponse>
|
||||
""" | Ec2Client.builder()
|
||||
"Rds" | "DeleteOptionGroup" | "POST" | "/" | "0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99" | { c -> c.deleteOptionGroup(DeleteOptionGroupRequest.builder().build()) } | """
|
||||
<DeleteOptionGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
|
||||
<ResponseMetadata>
|
||||
<RequestId>0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99</RequestId>
|
||||
</ResponseMetadata>
|
||||
</DeleteOptionGroupResponse>
|
||||
""" | RdsClient.builder()
|
||||
}
|
||||
|
||||
def "send #operation async request with builder {#builder.class.getName()} mocked response"() {
|
||||
setup:
|
||||
def client = builder
|
||||
.endpointOverride(serverUrl)
|
||||
.region(Region.AP_NORTHEAST_1)
|
||||
.credentialsProvider(CREDENTIALS_PROVIDER)
|
||||
.build()
|
||||
responseBody.set(body)
|
||||
def response = call.call(client)
|
||||
|
||||
if (response instanceof Future) {
|
||||
response = response.get()
|
||||
}
|
||||
|
||||
expect:
|
||||
response != null
|
||||
|
||||
// It looks like url doesn't contain trailing slash on empty path for some reason
|
||||
def expectedUrl = path == "/" ? "${server.address}" : "${server.address}${path}"
|
||||
|
||||
// Order is not guaranteed in these traces, so reorder them if needed to put aws trace first
|
||||
if (TEST_WRITER[0][0].serviceName != "java-aws-sdk") {
|
||||
def tmp = TEST_WRITER[0]
|
||||
TEST_WRITER[0] = TEST_WRITER[1]
|
||||
TEST_WRITER[1] = tmp
|
||||
}
|
||||
|
||||
assertTraces(2) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
serviceName "java-aws-sdk"
|
||||
operationName "aws.http"
|
||||
resourceName "$service.$operation"
|
||||
errored false
|
||||
parent()
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "java-aws-sdk"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" expectedUrl
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
"aws.service" "$service"
|
||||
"aws.operation" "${operation}"
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.requestId" "$requestId"
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: this should be part of the same trace but netty instrumentation doesn't cooperate
|
||||
trace(1, 1) {
|
||||
span(0) {
|
||||
operationName "netty.client.request"
|
||||
resourceName "$method $path"
|
||||
errored false
|
||||
parent()
|
||||
tags {
|
||||
"$Tags.COMPONENT.key" "netty-client"
|
||||
"$Tags.HTTP_STATUS.key" 200
|
||||
"$Tags.HTTP_URL.key" expectedUrl
|
||||
"$Tags.PEER_HOSTNAME.key" "localhost"
|
||||
"$Tags.PEER_PORT.key" server.address.port
|
||||
"$Tags.HTTP_METHOD.key" "$method"
|
||||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
server.lastRequest.headers.get("x-datadog-trace-id") == TEST_WRITER[0][0].traceId
|
||||
server.lastRequest.headers.get("x-datadog-parent-id") == TEST_WRITER[0][0].spanId
|
||||
|
||||
where:
|
||||
service | operation | method | path | requestId | call | body | builder
|
||||
"S3" | "CreateBucket" | "PUT" | "/testbucket" | "UNKNOWN" | { c -> c.createBucket(CreateBucketRequest.builder().bucket("testbucket").build()) } | "" | S3AsyncClient.builder()
|
||||
"S3" | "GetObject" | "GET" | "/someBucket/someKey" | "UNKNOWN" | { c -> c.getObject(GetObjectRequest.builder().bucket("someBucket").key("someKey").build(), AsyncResponseTransformer.toBytes()) } | "1234567890" | S3AsyncClient.builder()
|
||||
"Ec2" | "AllocateAddress" | "POST" | "/" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | { c -> c.allocateAddress() } | """
|
||||
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
|
||||
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
|
||||
<publicIp>192.0.2.1</publicIp>
|
||||
<domain>standard</domain>
|
||||
</AllocateAddressResponse>
|
||||
""" | Ec2AsyncClient.builder()
|
||||
"Rds" | "DeleteOptionGroup" | "POST" | "/" | "0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99" | { c -> c.deleteOptionGroup(DeleteOptionGroupRequest.builder().build()) } | """
|
||||
<DeleteOptionGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
|
||||
<ResponseMetadata>
|
||||
<RequestId>0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99</RequestId>
|
||||
</ResponseMetadata>
|
||||
</DeleteOptionGroupResponse>
|
||||
""" | RdsAsyncClient.builder()
|
||||
}
|
||||
|
||||
// TODO: add timeout tests?
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
|
||||
/** Helper utils for Runnable/Callable instrumentation */
|
||||
public class AdviceUtils {
|
||||
|
||||
/**
|
||||
* Start scope for a given task
|
||||
*
|
||||
* @param contextStore context storage for task's state
|
||||
* @param task task to start scope for
|
||||
* @param <T> task's type
|
||||
* @return scope if scope was started, or null
|
||||
*/
|
||||
public static <T> TraceScope startTaskScope(
|
||||
final ContextStore<T, State> contextStore, final T task) {
|
||||
final State state = contextStore.get(task);
|
||||
if (state != null) {
|
||||
final TraceScope.Continuation continuation = state.getAndResetContinuation();
|
||||
if (continuation != null) {
|
||||
final TraceScope scope = continuation.activate();
|
||||
scope.setAsyncPropagation(true);
|
||||
return scope;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void endTaskScope(final TraceScope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import java.util.HashSet;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.Future;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
|
@ -140,6 +141,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
final Map<String, String> map = new HashMap<>();
|
||||
map.put(Runnable.class.getName(), State.class.getName());
|
||||
map.put(Callable.class.getName(), State.class.getName());
|
||||
map.put(ForkJoinTask.class.getName(), State.class.getName());
|
||||
map.put(Future.class.getName(), State.class.getName());
|
||||
return Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
@ -150,15 +152,24 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
transformers.put(
|
||||
named("execute").and(takesArgument(0, Runnable.class)),
|
||||
SetExecuteRunnableStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("execute").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetExecuteForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit").and(takesArgument(0, Runnable.class)),
|
||||
SetSubmitRunnableStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit").and(takesArgument(0, Callable.class)),
|
||||
SetCallableStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetExecuteForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)),
|
||||
SetCallableStateForCallableCollectionAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetExecuteForkJoinStateAdvice.class.getName());
|
||||
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
|
||||
named("dispatch")
|
||||
.and(takesArgument(0, Runnable.class))
|
||||
|
@ -192,6 +203,30 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
}
|
||||
}
|
||||
|
||||
public static class SetExecuteForkJoinStateAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exitJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Enter final State state,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
public static class SetSubmitRunnableStateAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.InstrumentationContext;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
/**
|
||||
* Instrument {@link ForkJoinTask}.
|
||||
*
|
||||
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
|
||||
* ForkJoinPool}: JVM, akka, scala, netty to name a few. For now we only deal with JVM one because
|
||||
* there are known cases when JVM {@code ForkJoinTask} is supplied as {@code Runnable} into {@code
|
||||
* ForkJoinPool}.
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class ForkJoinTaskInstrumentation extends Instrumenter.Default {
|
||||
|
||||
public ForkJoinTaskInstrumentation() {
|
||||
super(ExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return not(isInterface()).and(safeHasSuperType(named(ForkJoinTask.class.getName())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
AdviceUtils.class.getName(),
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
map.put(Runnable.class.getName(), State.class.getName());
|
||||
map.put(Callable.class.getName(), State.class.getName());
|
||||
map.put(ForkJoinTask.class.getName(), State.class.getName());
|
||||
return Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("exec").and(takesArguments(0)).and(not(isAbstract())),
|
||||
ForkJoinTaskAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
public static class ForkJoinTaskAdvice {
|
||||
|
||||
/**
|
||||
* When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or
|
||||
* {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask}
|
||||
* directly. This means state is still stored in {@code Runnable} or {@code Callable} and we
|
||||
* need to use that state.
|
||||
*/
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static TraceScope enter(@Advice.This final ForkJoinTask thiz) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz);
|
||||
if (thiz instanceof Runnable) {
|
||||
final ContextStore<Runnable, State> runnableContextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
final TraceScope newScope =
|
||||
AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz);
|
||||
if (null != newScope) {
|
||||
if (null != scope) {
|
||||
newScope.close();
|
||||
} else {
|
||||
scope = newScope;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (thiz instanceof Callable) {
|
||||
final ContextStore<Callable, State> callableContextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
final TraceScope newScope =
|
||||
AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz);
|
||||
if (null != newScope) {
|
||||
if (null != scope) {
|
||||
newScope.close();
|
||||
} else {
|
||||
scope = newScope;
|
||||
}
|
||||
}
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter final TraceScope scope) {
|
||||
AdviceUtils.endTaskScope(scope);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -87,11 +87,6 @@ public final class FutureInstrumentation extends Instrumenter.Default {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {ExecutorInstrumentation.class.getName() + "$ConcurrentUtils"};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
|
|
|
@ -2,6 +2,7 @@ package datadog.trace.instrumentation.java.concurrent;
|
|||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
@ -22,6 +23,7 @@ import net.bytebuddy.description.method.MethodDescription;
|
|||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
/** Instrument {@link Runnable} and {@Callable} */
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class RunnableCallableInstrumentation extends Instrumenter.Default {
|
||||
|
@ -39,7 +41,7 @@ public final class RunnableCallableInstrumentation extends Instrumenter.Default
|
|||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
RunnableCallableInstrumentation.class.getName() + "$RunnableUtils",
|
||||
AdviceUtils.class.getName(),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -54,8 +56,10 @@ public final class RunnableCallableInstrumentation extends Instrumenter.Default
|
|||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(named("run").and(takesArguments(0)), RunnableAdvice.class.getName());
|
||||
transformers.put(named("call").and(takesArguments(0)), CallableAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("run").and(takesArguments(0)).and(isPublic()), RunnableAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("call").and(takesArguments(0)).and(isPublic()), CallableAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
|
@ -65,12 +69,12 @@ public final class RunnableCallableInstrumentation extends Instrumenter.Default
|
|||
public static TraceScope enter(@Advice.This final Runnable thiz) {
|
||||
final ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return RunnableUtils.startTaskScope(contextStore, thiz);
|
||||
return AdviceUtils.startTaskScope(contextStore, thiz);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter final TraceScope scope) {
|
||||
RunnableUtils.endTaskScope(scope);
|
||||
AdviceUtils.endTaskScope(scope);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,45 +84,12 @@ public final class RunnableCallableInstrumentation extends Instrumenter.Default
|
|||
public static TraceScope enter(@Advice.This final Callable thiz) {
|
||||
final ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
return RunnableUtils.startTaskScope(contextStore, thiz);
|
||||
return AdviceUtils.startTaskScope(contextStore, thiz);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter final TraceScope scope) {
|
||||
RunnableUtils.endTaskScope(scope);
|
||||
}
|
||||
}
|
||||
|
||||
/** Helper utils for Runnable/Callable instrumentation */
|
||||
@Slf4j
|
||||
public static class RunnableUtils {
|
||||
|
||||
/**
|
||||
* Start scope for a given task
|
||||
*
|
||||
* @param contextStore context storage for task's state
|
||||
* @param task task to start scope for
|
||||
* @param <T> task's type
|
||||
* @return scope if scope was started, or null
|
||||
*/
|
||||
public static <T> TraceScope startTaskScope(
|
||||
final ContextStore<T, State> contextStore, final T task) {
|
||||
final State state = contextStore.get(task);
|
||||
if (state != null) {
|
||||
final TraceScope.Continuation continuation = state.getAndResetContinuation();
|
||||
if (continuation != null) {
|
||||
final TraceScope scope = continuation.activate();
|
||||
scope.setAsyncPropagation(true);
|
||||
return scope;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void endTaskScope(final TraceScope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
AdviceUtils.endTaskScope(scope);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import java.util.concurrent.Callable
|
|||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.ForkJoinPool
|
||||
import java.util.concurrent.ForkJoinTask
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
|
@ -19,16 +20,25 @@ import java.util.concurrent.TimeUnit
|
|||
|
||||
class ExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method executeMethod
|
||||
Method executeRunnableMethod
|
||||
@Shared
|
||||
Method executeForkJoinTaskMethod
|
||||
@Shared
|
||||
Method submitRunnableMethod
|
||||
@Shared
|
||||
Method submitCallableMethod
|
||||
@Shared
|
||||
Method submitForkJoinTaskMethod
|
||||
@Shared
|
||||
Method invokeForkJoinTaskMethod
|
||||
|
||||
def setupSpec() {
|
||||
executeMethod = Executor.getMethod("execute", Runnable)
|
||||
executeRunnableMethod = Executor.getMethod("execute", Runnable)
|
||||
executeForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
|
||||
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
|
||||
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
|
||||
submitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
|
||||
invokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
|
@ -66,12 +76,15 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | executeRunnableMethod
|
||||
new ForkJoinPool() | executeForkJoinTaskMethod
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | executeMethod
|
||||
new ForkJoinPool() | submitForkJoinTaskMethod
|
||||
new ForkJoinPool() | invokeForkJoinTaskMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeMethod
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
import datadog.trace.api.Trace;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class AsyncChild implements Runnable, Callable {
|
||||
public class AsyncChild extends ForkJoinTask implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
|
||||
|
@ -10,6 +11,20 @@ public class AsyncChild implements Runnable, Callable {
|
|||
this(true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRawResult() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setRawResult(final Object value) {}
|
||||
|
||||
@Override
|
||||
protected boolean exec() {
|
||||
runImpl();
|
||||
return true;
|
||||
}
|
||||
|
||||
public AsyncChild(final boolean doTraceableWork, final boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
|
|
|
@ -27,7 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
|
|||
public class ChannelFutureListenerInstrumentation extends Instrumenter.Default {
|
||||
|
||||
public ChannelFutureListenerInstrumentation() {
|
||||
super("netty", "netty-4.1");
|
||||
super(
|
||||
NettyChannelPipelineInstrumentation.INSTRUMENTATION_NAME,
|
||||
NettyChannelPipelineInstrumentation.ADDITIONAL_INSTRUMENTATION_NAMES);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,8 +40,11 @@ import net.bytebuddy.matcher.ElementMatcher;
|
|||
@AutoService(Instrumenter.class)
|
||||
public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
|
||||
|
||||
static final String INSTRUMENTATION_NAME = "netty";
|
||||
static final String[] ADDITIONAL_INSTRUMENTATION_NAMES = {"netty-4.1"};
|
||||
|
||||
public NettyChannelPipelineInstrumentation() {
|
||||
super("netty", "netty-4.1");
|
||||
super(INSTRUMENTATION_NAME, ADDITIONAL_INSTRUMENTATION_NAMES);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,8 +16,12 @@ import io.opentracing.propagation.Format;
|
|||
import io.opentracing.tag.Tags;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collections;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
|
@ -37,10 +41,6 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
|||
final HttpRequest request = (HttpRequest) msg;
|
||||
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
|
||||
String url = request.uri();
|
||||
if (request.headers().contains(HOST)) {
|
||||
url = "http://" + request.headers().get(HOST) + url;
|
||||
}
|
||||
final Span span =
|
||||
GlobalTracer.get()
|
||||
.buildSpan("netty.client.request")
|
||||
|
@ -48,14 +48,17 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
|||
.withTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName())
|
||||
.withTag(Tags.PEER_PORT.getKey(), remoteAddress.getPort())
|
||||
.withTag(Tags.HTTP_METHOD.getKey(), request.method().name())
|
||||
.withTag(Tags.HTTP_URL.getKey(), url)
|
||||
.withTag(Tags.HTTP_URL.getKey(), formatUrl(request))
|
||||
.withTag(Tags.COMPONENT.getKey(), "netty-client")
|
||||
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
|
||||
.start();
|
||||
|
||||
GlobalTracer.get()
|
||||
.inject(
|
||||
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
|
||||
// AWS calls are often signed, so we can't add headers without breaking the signature.
|
||||
if (!request.headers().contains("amz-sdk-invocation-id")) {
|
||||
GlobalTracer.get()
|
||||
.inject(
|
||||
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
|
||||
}
|
||||
|
||||
ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);
|
||||
|
||||
|
@ -72,4 +75,18 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
|||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String formatUrl(final HttpRequest request) {
|
||||
try {
|
||||
URI uri = new URI(request.uri());
|
||||
if ((uri.getHost() == null || uri.getHost().equals("")) && request.headers().contains(HOST)) {
|
||||
uri = new URI("http://" + request.headers().get(HOST) + request.uri());
|
||||
}
|
||||
return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), null, null)
|
||||
.toString();
|
||||
} catch (final URISyntaxException e) {
|
||||
log.debug("Cannot parse netty uri: {}", request.uri());
|
||||
return request.uri();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -159,7 +159,8 @@ public abstract class AgentTestRunner extends Specification {
|
|||
|
||||
@Before
|
||||
public void beforeTest() {
|
||||
assert getTestTracer().activeSpan() == null : "Span is active before test has started";
|
||||
assert getTestTracer().activeSpan() == null
|
||||
: "Span is active before test has started: " + getTestTracer().activeSpan();
|
||||
TEST_WRITER.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ include ':dd-smoke-tests:wildfly'
|
|||
include ':dd-java-agent:instrumentation:akka-http-10.0'
|
||||
include ':dd-java-agent:instrumentation:apache-httpclient-4'
|
||||
include ':dd-java-agent:instrumentation:aws-java-sdk-1.11.0'
|
||||
include ':dd-java-agent:instrumentation:aws-java-sdk-2.2'
|
||||
include ':dd-java-agent:instrumentation:couchbase-2.0'
|
||||
include ':dd-java-agent:instrumentation:datastax-cassandra-2.3'
|
||||
include ':dd-java-agent:instrumentation:dropwizard'
|
||||
|
|
Loading…
Reference in New Issue