Allow trace to persist across netty connect.

Also create span with error on connection failure.

Add tests for connection failure.
This commit is contained in:
Tyler Benson 2018-08-21 15:56:27 +10:00
parent 520676538c
commit 898647e000
25 changed files with 498 additions and 90 deletions

View File

@ -51,6 +51,9 @@ public interface Instrumenter {
private final String instrumentationPrimaryName;
protected final boolean enabled;
protected final String packageName =
getClass().getPackage() == null ? "" : getClass().getPackage().getName();
public Default(final String instrumentationName, final String... additionalNames) {
instrumentationNames = new HashSet<>(Arrays.asList(additionalNames));
instrumentationNames.add(instrumentationName);

View File

@ -79,7 +79,18 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"play.api.libs.streams.Execution$trampoline$"
"play.api.libs.streams.Execution$trampoline$",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.nio.NioEventLoopGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
};
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));

View File

@ -31,8 +31,9 @@ dependencies {
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
// testCompile group: 'io.netty', name: 'netty-all', version: '4.0.0.Final'
testCompile group: 'io.netty', name: 'netty-codec-http', version: '4.0.0.Final'
testCompile group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.0'
}
@ -50,7 +51,7 @@ configurations.testCompile {
configurations.latestDepTestCompile {
resolutionStrategy {
force group: 'io.netty', name: 'netty-all', version: '4.0.56.Final'
force group: 'io.netty', name: 'netty-codec-http', version: '4.0.56.Final'
force group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.+'
}
}

View File

@ -0,0 +1,18 @@
package datadog.trace.instrumentation.netty40;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.server.HttpServerTracingHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class AttributeKeys {
public static final AttributeKey<TraceScope.Continuation>
PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY =
new AttributeKey<>("datadog.trace.instrumentation.netty40.parent.connect.continuation");
public static final AttributeKey<Span> SERVER_ATTRIBUTE_KEY =
new AttributeKey<>(HttpServerTracingHandler.class.getName() + ".span");
public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
new AttributeKey<>(HttpServerTracingHandler.class.getName() + ".span");
}

View File

@ -0,0 +1,100 @@
package datadog.trace.instrumentation.netty40;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import io.netty.channel.ChannelFuture;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class ChannelFutureListenerInstrumentation extends Instrumenter.Default {
public ChannelFutureListenerInstrumentation() {
super("netty", "netty-4.0");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named("io.netty.channel.ChannelFutureListener")));
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return classLoaderHasClasses("io.netty.handler.codec.spdy.SpdyOrHttpChooser");
}
@Override
public String[] helperClassNames() {
return new String[] {packageName + ".AttributeKeys"};
}
@Override
public Map<ElementMatcher, String> transformers() {
final Map<ElementMatcher, String> transformers = new HashMap<>();
transformers.put(
isMethod()
.and(named("operationComplete"))
.and(takesArgument(0, named("io.netty.channel.ChannelFuture"))),
OperationCompleteAdvice.class.getName());
return transformers;
}
public static class OperationCompleteAdvice {
@Advice.OnMethodEnter
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
final TraceScope.Continuation continuation =
future.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).get();
if (continuation == null) {
return null;
}
final TraceScope scope = continuation.activate();
final Throwable cause = future.cause();
if (cause != null) {
final Span errorSpan =
GlobalTracer.get()
.buildSpan("netty.connect")
.withTag(Tags.COMPONENT.getKey(), "netty")
.start();
Tags.ERROR.set(errorSpan, true);
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
errorSpan.finish();
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void deactivateScope(
@Advice.Enter final TraceScope scope, @Advice.Thrown final Throwable throwable) {
if (scope != null) {
((Scope) scope).close();
}
}
}
}

View File

@ -7,11 +7,13 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.client.HttpClientRequestTracingHandler;
import datadog.trace.instrumentation.netty40.client.HttpClientResponseTracingHandler;
import datadog.trace.instrumentation.netty40.client.HttpClientTracingHandler;
@ -26,6 +28,8 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
@ -35,11 +39,8 @@ import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
private static final String PACKAGE =
NettyChannelPipelineInstrumentation.class.getPackage().getName();
public NettyChannelPipelineInstrumentation() {
super("netty", "netty-4.1");
super("netty", "netty-4.0");
}
@Override
@ -60,16 +61,17 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AttributeKeys",
// client helpers
PACKAGE + ".client.NettyResponseInjectAdapter",
PACKAGE + ".client.HttpClientRequestTracingHandler",
PACKAGE + ".client.HttpClientResponseTracingHandler",
PACKAGE + ".client.HttpClientTracingHandler",
packageName + ".client.NettyResponseInjectAdapter",
packageName + ".client.HttpClientRequestTracingHandler",
packageName + ".client.HttpClientResponseTracingHandler",
packageName + ".client.HttpClientTracingHandler",
// server helpers
PACKAGE + ".server.NettyRequestExtractAdapter",
PACKAGE + ".server.HttpServerRequestTracingHandler",
PACKAGE + ".server.HttpServerResponseTracingHandler",
PACKAGE + ".server.HttpServerTracingHandler"
packageName + ".server.NettyRequestExtractAdapter",
packageName + ".server.HttpServerRequestTracingHandler",
packageName + ".server.HttpServerResponseTracingHandler",
packageName + ".server.HttpServerTracingHandler"
};
}
@ -81,6 +83,9 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
.and(nameStartsWith("add"))
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
ChannelPipelineAddAdvice.class.getName());
transformers.put(
isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))),
ChannelPipelineConnectAdvice.class.getName());
return transformers;
}
@ -138,4 +143,18 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
}
}
}
public static class ChannelPipelineConnectAdvice {
@Advice.OnMethodEnter
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope) {
pipeline
.channel()
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
.set(((TraceScope) scope).capture());
}
}
}
}

View File

@ -5,6 +5,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
@ -47,7 +48,7 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
ctx.channel().attr(HttpClientTracingHandler.attributeKey).set(span);
ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);
try {
ctx.write(msg, prm);

View File

@ -2,36 +2,53 @@ package datadog.trace.instrumentation.netty40.client;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span span = ctx.channel().attr(HttpClientTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}
final HttpResponse response = (HttpResponse) msg;
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
if (finishSpan) {
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).getStatus().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
Tags.HTTP_STATUS.set(span, response.getStatus().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}

View File

@ -1,16 +1,11 @@
package datadog.trace.instrumentation.netty40.client;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpClientTracingHandler
extends CombinedChannelDuplexHandler<
HttpClientResponseTracingHandler, HttpClientRequestTracingHandler> {
static final AttributeKey<Span> attributeKey =
new AttributeKey<>(HttpClientTracingHandler.class.getName());
public HttpClientTracingHandler() {
super(new HttpClientResponseTracingHandler(), new HttpClientRequestTracingHandler());
}

View File

@ -6,6 +6,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
@ -55,7 +56,7 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
}
final Span span = scope.span();
ctx.channel().attr(HttpServerTracingHandler.attributeKey).set(span);
ctx.channel().attr(AttributeKeys.SERVER_ATTRIBUTE_KEY).set(span);
try {
ctx.fireChannelRead(msg);

View File

@ -2,6 +2,7 @@ package datadog.trace.instrumentation.netty40.server;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
@ -14,7 +15,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
final Span span = ctx.channel().attr(HttpServerTracingHandler.attributeKey).get();
final Span span = ctx.channel().attr(AttributeKeys.SERVER_ATTRIBUTE_KEY).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
return;

View File

@ -1,16 +1,11 @@
package datadog.trace.instrumentation.netty40.server;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpServerTracingHandler
extends CombinedChannelDuplexHandler<
HttpServerRequestTracingHandler, HttpServerResponseTracingHandler> {
static final AttributeKey<Span> attributeKey =
new AttributeKey<>(HttpServerTracingHandler.class.getName());
public HttpServerTracingHandler() {
super(new HttpServerRequestTracingHandler(), new HttpServerResponseTracingHandler());
}

View File

@ -1,4 +1,5 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
@ -7,8 +8,10 @@ import org.asynchttpclient.DefaultAsyncHttpClientConfig
import spock.lang.AutoCleanup
import spock.lang.Shared
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.test.TestUtils.runUnderTrace
import static datadog.trace.agent.test.asserts.ListWriterAssert.assertTraces
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static org.asynchttpclient.Dsl.asyncHttpClient
@ -28,13 +31,15 @@ class Netty40ClientTest extends AgentTestRunner {
}
}
@Shared
def clientConfig = DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(TimeUnit.MINUTES.toMillis(1).toInteger())
def clientConfig = DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(TimeUnit.SECONDS.toMillis(5).toInteger())
@Shared
AsyncHttpClient asyncHttpClient = asyncHttpClient(clientConfig)
def "test server request/response"() {
setup:
def responseFuture = asyncHttpClient.prepareGet("$server.address").execute()
def responseFuture = runUnderTrace("parent") {
asyncHttpClient.prepareGet("$server.address").execute()
}
def response = responseFuture.get()
expect:
@ -43,12 +48,13 @@ class Netty40ClientTest extends AgentTestRunner {
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
trace(0, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.client.request"
resourceName "GET /"
spanType DDSpanTypes.HTTP_CLIENT
childOf span(1)
errored false
tags {
"$Tags.COMPONENT.key" "netty-client"
@ -62,6 +68,10 @@ class Netty40ClientTest extends AgentTestRunner {
defaultTags()
}
}
span(1) {
operationName "parent"
parent()
}
}
}
@ -69,4 +79,41 @@ class Netty40ClientTest extends AgentTestRunner {
server.lastRequest.headers.get("x-datadog-trace-id") == "${TEST_WRITER.get(0).get(0).traceId}"
server.lastRequest.headers.get("x-datadog-parent-id") == "${TEST_WRITER.get(0).get(0).spanId}"
}
def "test connection failure"() {
setup:
def invalidPort = TestUtils.randomOpenPort()
def responseFuture = runUnderTrace("parent") {
asyncHttpClient.prepareGet("http://localhost:$invalidPort/").execute()
}
when:
responseFuture.get()
then:
def throwable = thrown(ExecutionException)
throwable.cause instanceof ConnectException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 2) {
span(0) {
operationName "parent"
parent()
}
span(1) {
operationName "netty.connect"
resourceName "netty.connect"
childOf span(0)
errored true
tags {
"$Tags.COMPONENT.key" "netty"
errorTags ConnectException, "Connection refused: localhost/127.0.0.1:$invalidPort"
defaultTags()
}
}
}
}
}
}

View File

@ -31,10 +31,10 @@ dependencies {
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile group: 'io.netty', name: 'netty-all', version: '4.1.0.Final'
testCompile group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.31'
// async-http-client:2.0.32+ would require netty:4.1.9.Final
testCompile group: 'io.netty', name: 'netty-codec-http', version: '4.1.0.Final'
testCompile group: 'org.asynchttpclient', name: 'async-http-client', version: '2.1.0'
}
// We need to force the dependency to the earliest supported version because other libraries declare newer versions.
@ -51,7 +51,7 @@ configurations.testCompile {
configurations.latestDepTestCompile {
resolutionStrategy {
force group: 'io.netty', name: 'netty-all', version: '+'
force group: 'io.netty', name: 'netty-codec-http', version: '+'
force group: 'org.asynchttpclient', name: 'async-http-client', version: '+'
}
}

View File

@ -0,0 +1,18 @@
package datadog.trace.instrumentation.netty41;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty41.server.HttpServerTracingHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class AttributeKeys {
public static final AttributeKey<TraceScope.Continuation>
PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY =
AttributeKey.valueOf("datadog.trace.instrumentation.netty41.parent.connect.continuation");
public static final AttributeKey<Span> SERVER_ATTRIBUTE_KEY =
AttributeKey.valueOf(HttpServerTracingHandler.class.getName() + ".span");
public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
AttributeKey.valueOf(HttpServerTracingHandler.class.getName() + ".span");
}

View File

@ -0,0 +1,100 @@
package datadog.trace.instrumentation.netty41;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import io.netty.channel.ChannelFuture;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class ChannelFutureListenerInstrumentation extends Instrumenter.Default {
public ChannelFutureListenerInstrumentation() {
super("netty", "netty-4.0");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named("io.netty.channel.ChannelFutureListener")));
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return classLoaderHasClasses("io.netty.handler.codec.http.HttpHeaderValues");
}
@Override
public String[] helperClassNames() {
return new String[] {packageName + ".AttributeKeys"};
}
@Override
public Map<ElementMatcher, String> transformers() {
final Map<ElementMatcher, String> transformers = new HashMap<>();
transformers.put(
isMethod()
.and(named("operationComplete"))
.and(takesArgument(0, named("io.netty.channel.ChannelFuture"))),
OperationCompleteAdvice.class.getName());
return transformers;
}
public static class OperationCompleteAdvice {
@Advice.OnMethodEnter
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
final TraceScope.Continuation continuation =
future.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).get();
if (continuation == null) {
return null;
}
final TraceScope scope = continuation.activate();
final Throwable cause = future.cause();
if (cause != null) {
final Span errorSpan =
GlobalTracer.get()
.buildSpan("netty.connect")
.withTag(Tags.COMPONENT.getKey(), "netty")
.start();
Tags.ERROR.set(errorSpan, true);
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
errorSpan.finish();
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void deactivateScope(
@Advice.Enter final TraceScope scope, @Advice.Thrown final Throwable throwable) {
if (scope != null) {
((Scope) scope).close();
}
}
}
}

View File

@ -7,11 +7,13 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty41.client.HttpClientRequestTracingHandler;
import datadog.trace.instrumentation.netty41.client.HttpClientResponseTracingHandler;
import datadog.trace.instrumentation.netty41.client.HttpClientTracingHandler;
@ -26,6 +28,8 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
@ -35,9 +39,6 @@ import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
private static final String PACKAGE =
NettyChannelPipelineInstrumentation.class.getPackage().getName();
public NettyChannelPipelineInstrumentation() {
super("netty", "netty-4.1");
}
@ -60,16 +61,17 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AttributeKeys",
// client helpers
PACKAGE + ".client.NettyResponseInjectAdapter",
PACKAGE + ".client.HttpClientRequestTracingHandler",
PACKAGE + ".client.HttpClientResponseTracingHandler",
PACKAGE + ".client.HttpClientTracingHandler",
packageName + ".client.NettyResponseInjectAdapter",
packageName + ".client.HttpClientRequestTracingHandler",
packageName + ".client.HttpClientResponseTracingHandler",
packageName + ".client.HttpClientTracingHandler",
// server helpers
PACKAGE + ".server.NettyRequestExtractAdapter",
PACKAGE + ".server.HttpServerRequestTracingHandler",
PACKAGE + ".server.HttpServerResponseTracingHandler",
PACKAGE + ".server.HttpServerTracingHandler"
packageName + ".server.NettyRequestExtractAdapter",
packageName + ".server.HttpServerRequestTracingHandler",
packageName + ".server.HttpServerResponseTracingHandler",
packageName + ".server.HttpServerTracingHandler"
};
}
@ -81,6 +83,9 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
.and(nameStartsWith("add"))
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
ChannelPipelineAddAdvice.class.getName());
transformers.put(
isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))),
ChannelPipelineConnectAdvice.class.getName());
return transformers;
}
@ -138,4 +143,18 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
}
}
}
public static class ChannelPipelineConnectAdvice {
@Advice.OnMethodEnter
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope) {
pipeline
.channel()
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
.set(((TraceScope) scope).capture());
}
}
}
}

View File

@ -5,6 +5,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.netty41.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
@ -47,7 +48,7 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
ctx.channel().attr(HttpClientTracingHandler.attributeKey).set(span);
ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);
try {
ctx.write(msg, prm);

View File

@ -2,36 +2,53 @@ package datadog.trace.instrumentation.netty41.client;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty41.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span span = ctx.channel().attr(HttpClientTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}
final HttpResponse response = (HttpResponse) msg;
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
if (finishSpan) {
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).status().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
Tags.HTTP_STATUS.set(span, response.status().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}

View File

@ -1,16 +1,11 @@
package datadog.trace.instrumentation.netty41.client;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpClientTracingHandler
extends CombinedChannelDuplexHandler<
HttpClientResponseTracingHandler, HttpClientRequestTracingHandler> {
static final AttributeKey<Span> attributeKey =
AttributeKey.valueOf(HttpClientTracingHandler.class.getName());
public HttpClientTracingHandler() {
super(new HttpClientResponseTracingHandler(), new HttpClientRequestTracingHandler());
}

View File

@ -6,6 +6,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty41.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
@ -55,7 +56,7 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
}
final Span span = scope.span();
ctx.channel().attr(HttpServerTracingHandler.attributeKey).set(span);
ctx.channel().attr(AttributeKeys.SERVER_ATTRIBUTE_KEY).set(span);
try {
ctx.fireChannelRead(msg);

View File

@ -2,6 +2,7 @@ package datadog.trace.instrumentation.netty41.server;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.instrumentation.netty41.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
@ -14,7 +15,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
final Span span = ctx.channel().attr(HttpServerTracingHandler.attributeKey).get();
final Span span = ctx.channel().attr(AttributeKeys.SERVER_ATTRIBUTE_KEY).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
return;

View File

@ -1,16 +1,11 @@
package datadog.trace.instrumentation.netty41.server;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpServerTracingHandler
extends CombinedChannelDuplexHandler<
HttpServerRequestTracingHandler, HttpServerResponseTracingHandler> {
static final AttributeKey<Span> attributeKey =
AttributeKey.valueOf(HttpServerTracingHandler.class.getName());
public HttpServerTracingHandler() {
super(new HttpServerRequestTracingHandler(), new HttpServerResponseTracingHandler());
}

View File

@ -1,14 +1,18 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.netty.channel.AbstractChannel
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import spock.lang.AutoCleanup
import spock.lang.Shared
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.test.TestUtils.runUnderTrace
import static datadog.trace.agent.test.asserts.ListWriterAssert.assertTraces
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static org.asynchttpclient.Dsl.asyncHttpClient
@ -28,13 +32,15 @@ class Netty41ClientTest extends AgentTestRunner {
}
}
@Shared
def clientConfig = DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(TimeUnit.MINUTES.toMillis(1).toInteger())
def clientConfig = DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(TimeUnit.SECONDS.toMillis(5).toInteger())
@Shared
AsyncHttpClient asyncHttpClient = asyncHttpClient(clientConfig)
def "test server request/response"() {
setup:
def responseFuture = asyncHttpClient.prepareGet("$server.address").execute()
def responseFuture = runUnderTrace("parent") {
asyncHttpClient.prepareGet("$server.address").execute()
}
def response = responseFuture.get()
expect:
@ -43,12 +49,13 @@ class Netty41ClientTest extends AgentTestRunner {
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
trace(0, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.client.request"
resourceName "GET /"
spanType DDSpanTypes.HTTP_CLIENT
childOf span(1)
errored false
tags {
"$Tags.COMPONENT.key" "netty-client"
@ -62,6 +69,10 @@ class Netty41ClientTest extends AgentTestRunner {
defaultTags()
}
}
span(1) {
operationName "parent"
parent()
}
}
}
@ -69,4 +80,41 @@ class Netty41ClientTest extends AgentTestRunner {
server.lastRequest.headers.get("x-datadog-trace-id") == "${TEST_WRITER.get(0).get(0).traceId}"
server.lastRequest.headers.get("x-datadog-parent-id") == "${TEST_WRITER.get(0).get(0).spanId}"
}
def "test connection failure"() {
setup:
def invalidPort = TestUtils.randomOpenPort()
def responseFuture = runUnderTrace("parent") {
asyncHttpClient.prepareGet("http://localhost:$invalidPort/").execute()
}
when:
responseFuture.get()
then:
def throwable = thrown(ExecutionException)
throwable.cause instanceof ConnectException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 2) {
span(0) {
operationName "parent"
parent()
}
span(1) {
operationName "netty.connect"
resourceName "netty.connect"
childOf span(0)
errored true
tags {
"$Tags.COMPONENT.key" "netty"
errorTags AbstractChannel.AnnotatedConnectException, "Connection refused: localhost/127.0.0.1:$invalidPort"
defaultTags()
}
}
}
}
}
}

View File

@ -3,6 +3,7 @@ package datadog.trace.agent.test;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.agent.tooling.Utils;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
@ -85,6 +86,8 @@ public class TestUtils {
public static <T extends Object> Object runUnderTrace(
final String rootOperationName, final Callable<T> r) throws Exception {
final Scope scope = GlobalTracer.get().buildSpan(rootOperationName).startActive(true);
((TraceScope) scope).setAsyncPropagation(true);
try {
return r.call();
} catch (final Exception e) {
@ -94,6 +97,7 @@ public class TestUtils {
throw e;
} finally {
((TraceScope) scope).setAsyncPropagation(false);
scope.close();
}
}