Change the scope for the netty client callback

Previously the scope was the http client span, which could result in deep nesting.  Now it is the parent span.

Before
[——————Parent—————]
   [ ^ ———Client—————]
                        [ ^—Child—]

Now:
[——————Parent—————]
   [ ^ —Client—] [ ^—Child—]

Also improve the tests.
This commit is contained in:
Tyler Benson 2019-06-14 17:13:20 -07:00
parent 13b794c504
commit 6ccb0d71d8
15 changed files with 202 additions and 226 deletions

View File

@ -16,4 +16,7 @@ public class AttributeKeys {
public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
new AttributeKey<>(HttpClientTracingHandler.class.getName() + ".span");
public static final AttributeKey<Span> CLIENT_PARENT_ATTRIBUTE_KEY =
new AttributeKey<>(HttpClientTracingHandler.class.getName() + ".parent");
}

View File

@ -10,6 +10,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
@ -34,19 +35,19 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
final HttpRequest request = (HttpRequest) msg;
final Span span = GlobalTracer.get().buildSpan("netty.client.request").start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final Tracer tracer = GlobalTracer.get();
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY).set(tracer.activeSpan());
final Span span = tracer.buildSpan("netty.client.request").start();
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
DECORATE.afterStart(span);
DECORATE.onRequest(span, request);
DECORATE.onPeerConnection(span, (InetSocketAddress) ctx.channel().remoteAddress());
// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!request.headers().contains("amz-sdk-invocation-id")) {
GlobalTracer.get()
.inject(
span.context(),
Format.Builtin.HTTP_HEADERS,
new NettyResponseInjectAdapter(request));
tracer.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
}
ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);

View File

@ -9,42 +9,34 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span parent = ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY).get();
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;
if (span != null && finishSpan) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
DECORATE.onResponse(span, (HttpResponse) msg);
DECORATE.beforeFinish(span);
}
}
// We want the callback in the scope of the parent, not the client span
if (parent != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(parent, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
if (finishSpan) {
DECORATE.onResponse(span, (HttpResponse) msg);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
} else {
ctx.fireChannelRead(msg);
}
}
}

View File

@ -1,8 +1,10 @@
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.instrumentation.netty40.client.NettyHttpClientDecorator
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.asynchttpclient.Response
import spock.lang.Shared
import java.util.concurrent.ExecutionException
@ -25,8 +27,13 @@ class Netty40ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
def methodName = "prepare" + method.toLowerCase().capitalize()
def requestBuilder = asyncHttpClient."$methodName"(uri.toString())
headers.each { requestBuilder.setHeader(it.key, it.value) }
def response = requestBuilder.execute().get()
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
callback?.call()
return response
}
}).get()
return response.statusCode
}
@ -66,7 +73,7 @@ class Netty40ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
and:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent", thrownException)
basicSpan(it, 0, "parent", null, thrownException)
span(1) {
operationName "netty.connect"

View File

@ -20,4 +20,7 @@ public class AttributeKeys {
public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
AttributeKey.valueOf(HttpClientTracingHandler.class.getName() + ".span");
public static final AttributeKey<Span> CLIENT_PARENT_ATTRIBUTE_KEY =
AttributeKey.valueOf(HttpClientTracingHandler.class.getName() + ".parent");
}

View File

@ -10,6 +10,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
@ -34,19 +35,19 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
final HttpRequest request = (HttpRequest) msg;
final Span span = GlobalTracer.get().buildSpan("netty.client.request").start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final Tracer tracer = GlobalTracer.get();
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY).set(tracer.activeSpan());
final Span span = tracer.buildSpan("netty.client.request").start();
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
DECORATE.afterStart(span);
DECORATE.onRequest(span, request);
DECORATE.onPeerConnection(span, (InetSocketAddress) ctx.channel().remoteAddress());
// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!request.headers().contains("amz-sdk-invocation-id")) {
GlobalTracer.get()
.inject(
span.context(),
Format.Builtin.HTTP_HEADERS,
new NettyResponseInjectAdapter(request));
tracer.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
}
ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);

View File

@ -9,42 +9,34 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span parent = ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY).get();
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;
if (span != null && finishSpan) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
DECORATE.onResponse(span, (HttpResponse) msg);
DECORATE.beforeFinish(span);
}
}
// We want the callback in the scope of the parent, not the client span
if (parent != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(parent, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
if (finishSpan) {
DECORATE.onResponse(span, (HttpResponse) msg);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
} else {
ctx.fireChannelRead(msg);
}
}
}

View File

@ -9,8 +9,10 @@ import io.netty.channel.ChannelInitializer
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.codec.http.HttpClientCodec
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.asynchttpclient.Response
import spock.lang.Shared
import java.util.concurrent.ExecutionException
@ -33,8 +35,13 @@ class Netty41ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
def methodName = "prepare" + method.toLowerCase().capitalize()
def requestBuilder = asyncHttpClient."$methodName"(uri.toString())
headers.each { requestBuilder.setHeader(it.key, it.value) }
def response = requestBuilder.execute().get()
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
callback?.call()
return response
}
}).get()
return response.statusCode
}
@ -75,7 +82,7 @@ class Netty41ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
and:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent", thrownException)
basicSpan(it, 0, "parent", null, thrownException)
span(1) {
operationName "netty.connect"

View File

@ -429,7 +429,7 @@ class RatpackTest extends AgentTestRunner {
serviceName "unnamed-java-app"
operationName "netty.client.request"
spanType DDSpanTypes.HTTP_CLIENT
childOf(span(3))
childOf(span(1))
errored false
tags {
"$Tags.COMPONENT.key" "netty-client"

View File

@ -1,151 +1,54 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.api.DDSpanTypes
import io.netty.channel.AbstractChannel
import io.opentracing.tag.Tags
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpClient
import io.vertx.core.http.HttpClientRequest
import io.vertx.core.http.HttpClientResponse
import io.vertx.core.http.HttpMethod
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Timeout
import java.util.concurrent.CompletableFuture
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class VertxHttpClientTest extends AgentTestRunner {
private static final String MESSAGE = "hello world"
@AutoCleanup
@Shared
def server = httpServer {
handlers {
prefix("success") {
handleDistributedRequest()
response.status(200).send(MESSAGE)
}
prefix("error") {
handleDistributedRequest()
throw new RuntimeException("error")
}
}
}
@Timeout(10)
class VertxHttpClientTest extends HttpClientTest<NettyHttpClientDecorator> {
@Shared
Vertx vertx = Vertx.vertx(new VertxOptions())
@Shared
HttpClient httpClient = vertx.createHttpClient()
def "#route request trace"() {
setup:
def responseFuture = new CompletableFuture<HttpClientResponse>()
def messageFuture = new CompletableFuture<String>()
httpClient.getNow(server.address.port, server.address.host, "/" + route, { response ->
responseFuture.complete(response)
response.bodyHandler({ buffer ->
messageFuture.complete(buffer.toString())
})
})
when:
HttpClientResponse response = responseFuture.get()
String message = messageFuture.get()
then:
response.statusCode() == expectedStatus
if (expectedMessage != null) {
message == expectedMessage
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
CompletableFuture<HttpClientResponse> future = new CompletableFuture<>()
def request = httpClient.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri")
headers.each { request.putHeader(it.key, it.value) }
request.handler { response ->
callback?.call()
future.complete(response)
}
assertTraces(2) {
server.distributedRequestTrace(it, 0, TEST_WRITER[1][0])
trace(1, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "netty.client.request"
resourceName "GET /$route"
spanType DDSpanTypes.HTTP_CLIENT
errored expectedError
tags {
defaultTags()
"$Tags.HTTP_STATUS.key" expectedStatus
"$Tags.HTTP_URL.key" "${server.address}/$route"
"$Tags.PEER_HOSTNAME.key" server.address.host
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" server.address.port
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.COMPONENT.key" "netty-client"
if (expectedError) {
"$Tags.ERROR.key" true
}
}
}
}
}
where:
route | expectedStatus | expectedError | expectedMessage
"success" | 200 | false | MESSAGE
"error" | 500 | true | null
}
def "test connection failure"() {
setup:
def invalidPort = PortUtils.randomOpenPort()
def errorFuture = new CompletableFuture<Throwable>()
runUnderTrace("parent") {
HttpClientRequest request = httpClient.request(
HttpMethod.GET,
invalidPort,
server.address.host,
"/",
{ response ->
// We expect to never get here since our request is expected to fail
errorFuture.complete(null)
})
request.exceptionHandler({ error ->
errorFuture.complete(error)
})
request.end()
return future.get().statusCode()
}
when:
def throwable = errorFuture.get()
@Override
NettyHttpClientDecorator decorator() {
return NettyHttpClientDecorator.DECORATE
}
then:
throwable.cause instanceof ConnectException
@Override
String expectedOperationName() {
return "netty.client.request"
}
and:
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "parent"
parent()
}
span(1) {
operationName "netty.connect"
resourceName "netty.connect"
childOf span(0)
errored true
tags {
"$Tags.COMPONENT.key" "netty"
errorTags AbstractChannel.AnnotatedConnectException, "Connection refused: localhost/127.0.0.1:$invalidPort"
defaultTags()
}
}
}
@Override
boolean testRedirects() {
false
}
@Override
boolean testConnectionFailure() {
false
}
}

View File

@ -31,7 +31,7 @@ class VertxServerTest extends AgentTestRunner {
def "test server request/response"() {
setup:
def request = new Request.Builder()
.url("http://localhost:$port/test")
.url("http://localhost:$port/proxy")
.header("x-datadog-trace-id", "123")
.header("x-datadog-parent-id", "456")
.get()
@ -43,14 +43,13 @@ class VertxServerTest extends AgentTestRunner {
response.body().string() == "Hello World"
and:
assertTraces(1) {
assertTraces(2) {
trace(0, 2) {
span(0) {
traceId "123"
parentId "456"
serviceName "unnamed-java-app"
operationName "netty.request"
resourceName "GET /test"
childOf(trace(1).get(1))
spanType DDSpanTypes.HTTP_SERVER
errored false
tags {
@ -70,6 +69,47 @@ class VertxServerTest extends AgentTestRunner {
assert span(1).operationName.endsWith('.tracedMethod')
}
}
trace(1, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.request"
resourceName "GET /proxy"
traceId "123"
parentId "456"
spanType DDSpanTypes.HTTP_SERVER
errored false
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$port/proxy"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
defaultTags(true)
}
}
span(1) {
serviceName "unnamed-java-app"
operationName "netty.client.request"
resourceName "GET /test"
childOf(span(0))
spanType DDSpanTypes.HTTP_CLIENT
errored false
tags {
"$Tags.COMPONENT.key" "netty-client"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$port/test"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
defaultTags()
}
}
}
}
}

View File

@ -1,25 +1,36 @@
import datadog.trace.api.Trace;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class VertxWebTestServer extends AbstractVerticle {
public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port";
public static Vertx start(final int port) throws ExecutionException, InterruptedException {
/* This is highly against Vertx ideas, but our tests are synchronous
so we have to make sure server is up and running */
final CompletableFuture<Void> future = new CompletableFuture<>();
final Vertx vertx = Vertx.vertx(new VertxOptions());
final Vertx vertx = Vertx.vertx(new VertxOptions().setClusterPort(port));
vertx.deployVerticle(
new VertxWebTestServer(port),
VertxWebTestServer.class.getName(),
new DeploymentOptions()
.setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port))
.setInstances(3),
res -> {
if (!res.succeeded()) {
throw new RuntimeException("Cannot deploy server Verticle");
throw new RuntimeException("Cannot deploy server Verticle", res.cause());
}
future.complete(null);
});
@ -29,14 +40,12 @@ public class VertxWebTestServer extends AbstractVerticle {
return vertx;
}
private final int port;
public VertxWebTestServer(final int port) {
this.port = port;
}
@Override
public void start(final Future<Void> startFuture) {
final HttpClient client = vertx.createHttpClient();
final int port = config().getInteger(CONFIG_HTTP_SERVER_PORT);
final Router router = Router.router(vertx);
router
@ -51,6 +60,26 @@ public class VertxWebTestServer extends AbstractVerticle {
routingContext -> {
routingContext.response().setStatusCode(500).end();
});
router
.route("/proxy")
.handler(
routingContext -> {
client
.get(
port,
"localhost",
"/test",
response -> {
response.bodyHandler(
buffer -> {
routingContext
.response()
.setStatusCode(response.statusCode())
.end(buffer);
});
})
.end(Optional.ofNullable(routingContext.getBody()).orElse(Buffer.buffer()));
});
router
.route("/test")
.handler(
@ -58,10 +87,7 @@ public class VertxWebTestServer extends AbstractVerticle {
tracedMethod();
routingContext.next();
})
.blockingHandler(
routingContext -> {
routingContext.next();
})
.blockingHandler(RoutingContext::next)
.handler(
routingContext -> {
routingContext.response().putHeader("content-type", "text/html").end("Hello World");
@ -74,5 +100,5 @@ public class VertxWebTestServer extends AbstractVerticle {
}
@Trace
public void tracedMethod() {}
private void tracedMethod() {}
}

View File

@ -42,9 +42,11 @@ dependencies {
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:netty-4.1')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile project(':dd-java-agent:instrumentation:netty-4.1')
// Tests seem to fail before 3.5... maybe a problem with some of the tests?
testCompile group: 'io.vertx', name: 'vertx-web', version: '3.5.0'
latestDepTestCompile group: 'io.vertx', name: 'vertx-web', version: '+'

View File

@ -176,10 +176,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
assertTraces(1) {
trace(0, size(3)) {
basicSpan(it, 0, "parent")
span(1) {
operationName "child"
childOf span(0)
}
basicSpan(it, 1, "child", span(0))
clientSpan(it, 2, span(0), method, false)
}
}
@ -205,10 +202,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
clientSpan(it, 0, null, method, false)
}
trace(1, 1) {
span(0) {
operationName "child"
parent()
}
basicSpan(it, 0, "child")
}
}
@ -306,7 +300,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
and:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent", thrownException)
basicSpan(it, 0, "parent", null, thrownException)
clientSpan(it, 1, span(0), method, false, false, uri, null, thrownException)
}
}

View File

@ -1,5 +1,6 @@
package datadog.trace.agent.test.utils
import datadog.opentracing.DDSpan
import datadog.trace.agent.decorator.BaseDecorator
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.context.TraceScope
@ -42,9 +43,13 @@ class TraceUtils {
}
}
static basicSpan(TraceAssert trace, int index, String spanName, Throwable exception = null) {
static basicSpan(TraceAssert trace, int index, String spanName, Object parentSpan = null, Throwable exception = null) {
trace.span(index) {
if (parentSpan == null) {
parent()
} else {
childOf((DDSpan) parentSpan)
}
serviceName "unnamed-java-app"
operationName spanName
resourceName spanName