Merge pull request #352 from DataDog/tyler/netty

Netty HTTP client and server instrumentation
This commit is contained in:
Tyler Benson 2018-06-21 14:11:12 +10:00 committed by GitHub
commit 83db4e8c4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1520 additions and 21 deletions

View File

@ -241,6 +241,7 @@ class VersionScanPlugin implements Plugin<Project> {
list.removeIf {
def version = it.toString().toLowerCase()
return version.contains("rc") ||
version.contains(".cr") ||
version.contains("alpha") ||
version.contains("beta") ||
version.contains("-b") ||

View File

@ -98,7 +98,11 @@ public class AgentInstaller {
log.debug("--{}", mismatch);
}
} else {
log.debug("Failed to handle {} for transformation: {}", typeName, throwable.getMessage());
log.debug(
"Failed to handle {} for transformation on classloader {}: {}",
typeName,
classLoader,
throwable.getMessage());
}
}

View File

@ -124,7 +124,7 @@ public final class HandlerInstrumentation extends Instrumenter.Configurable {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
scope.close();
scope.span().finish(); // Finish the span manually since finishSpanOnClose was false
span.finish(); // Finish the span manually since finishSpanOnClose was false
} else if (req.isAsyncStarted()) {
final AtomicBoolean activated = new AtomicBoolean(false);
// what if async is already finished? This would not be called
@ -149,7 +149,7 @@ public final class HandlerInstrumentation extends Instrumenter.Configurable {
@Override
public void onComplete(final AsyncEvent event) throws IOException {
if (activated.compareAndSet(false, true)) {
try (Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
Tags.HTTP_STATUS.set(
span, ((HttpServletResponse) event.getSuppliedResponse()).getStatus());
}
@ -159,7 +159,7 @@ public final class HandlerInstrumentation extends Instrumenter.Configurable {
@Override
public void onTimeout(final AsyncEvent event) throws IOException {
if (activated.compareAndSet(false, true)) {
try (Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
Tags.ERROR.set(span, Boolean.TRUE);
span.setTag("timeout", event.getAsyncContext().getTimeout());
}
@ -169,7 +169,7 @@ public final class HandlerInstrumentation extends Instrumenter.Configurable {
@Override
public void onError(final AsyncEvent event) throws IOException {
if (event.getThrowable() != null && activated.compareAndSet(false, true)) {
try (Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
if (((HttpServletResponse) event.getSuppliedResponse()).getStatus()
== HttpServletResponse.SC_OK) {
// exception is thrown in filter chain, but status code is incorrect

View File

@ -0,0 +1,60 @@
apply plugin: 'version-scan'
versionScan {
group = "io.netty"
module = "netty-all"
legacyModule = "netty"
versions = "[4.0.0.Final,4.1.0.Final)"
verifyPresent = [
"io.netty.channel.local.LocalEventLoop": null,
]
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.netty', name: 'netty-all', version: '4.0.0.Final'
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
// testCompile group: 'io.netty', name: 'netty-all', version: '4.0.0.Final'
testCompile group: 'org.eclipse.jetty', name: 'jetty-server', version: '8.2.0.v20160908'
testCompile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.6.0'
testCompile group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.0'
}
// We need to force the dependency to the earliest supported version because other libraries declare newer versions.
configurations.testCompile {
resolutionStrategy {
eachDependency { DependencyResolveDetails details ->
//specifying a fixed version for all libraries with io.netty' group
if (details.requested.group == 'io.netty') {
details.useVersion "4.0.0.Final"
}
}
}
}
configurations.latestDepTestCompile {
resolutionStrategy {
force group: 'io.netty', name: 'netty-all', version: '4.0.56.Final'
force group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.+'
}
}
testJava8Only += '**/*Test.class'

View File

@ -0,0 +1,131 @@
package datadog.trace.instrumentation.netty40;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.instrumentation.netty40.client.HttpClientRequestTracingHandler;
import datadog.trace.instrumentation.netty40.client.HttpClientResponseTracingHandler;
import datadog.trace.instrumentation.netty40.client.HttpClientTracingHandler;
import datadog.trace.instrumentation.netty40.server.HttpServerRequestTracingHandler;
import datadog.trace.instrumentation.netty40.server.HttpServerResponseTracingHandler;
import datadog.trace.instrumentation.netty40.server.HttpServerTracingHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
@AutoService(Instrumenter.class)
public class NettyChannelPipelineInstrumentation extends Instrumenter.Configurable {
private static final String PACKAGE =
NettyChannelPipelineInstrumentation.class.getPackage().getName();
public NettyChannelPipelineInstrumentation() {
super("netty", "netty-4.1");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
public AgentBuilder apply(final AgentBuilder agentBuilder) {
return agentBuilder
.type(
not(isInterface()).and(hasSuperType(named("io.netty.channel.ChannelPipeline"))),
classLoaderHasClasses("io.netty.channel.local.LocalEventLoop"))
.transform(
new HelperInjector(
// client helpers
PACKAGE + ".client.NettyResponseInjectAdapter",
PACKAGE + ".client.HttpClientRequestTracingHandler",
PACKAGE + ".client.HttpClientResponseTracingHandler",
PACKAGE + ".client.HttpClientTracingHandler",
// server helpers
PACKAGE + ".server.NettyRequestExtractAdapter",
PACKAGE + ".server.HttpServerRequestTracingHandler",
PACKAGE + ".server.HttpServerResponseTracingHandler",
PACKAGE + ".server.HttpServerTracingHandler"))
.transform(DDTransformers.defaultTransformers())
.transform(
DDAdvice.create()
.advice(
isMethod()
.and(nameStartsWith("add"))
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
ChannelPipelineAddAdvice.class.getName()))
.asDecorator();
}
/**
* When certain handlers are added to the pipeline, we want to add our corresponding tracing
* handlers. If those handlers are later removed, we may want to remove our handlers. That is not
* currently implemented.
*/
public static class ChannelPipelineAddAdvice {
@Advice.OnMethodEnter
public static int checkDepth() {
return CallDepthThreadLocalMap.incrementCallDepth(ChannelPipeline.class);
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void addHandler(
@Advice.Enter final int depth,
@Advice.This final ChannelPipeline pipeline,
@Advice.Argument(2) final ChannelHandler handler) {
if (depth > 0) return;
try {
// Server pipeline handlers
if (handler instanceof HttpServerCodec) {
pipeline.addLast(
HttpServerTracingHandler.class.getName(), new HttpServerTracingHandler());
} else if (handler instanceof HttpRequestDecoder) {
pipeline.addLast(
HttpServerRequestTracingHandler.class.getName(),
new HttpServerRequestTracingHandler());
} else if (handler instanceof HttpResponseEncoder) {
pipeline.addLast(
HttpServerResponseTracingHandler.class.getName(),
new HttpServerResponseTracingHandler());
} else
// Client pipeline handlers
if (handler instanceof HttpClientCodec) {
pipeline.addLast(
HttpClientTracingHandler.class.getName(), new HttpClientTracingHandler());
} else if (handler instanceof HttpRequestEncoder) {
pipeline.addLast(
HttpClientRequestTracingHandler.class.getName(),
new HttpClientRequestTracingHandler());
} else if (handler instanceof HttpResponseDecoder) {
pipeline.addLast(
HttpClientResponseTracingHandler.class.getName(),
new HttpClientResponseTracingHandler());
}
} catch (final IllegalArgumentException e) {
// Prevented adding duplicate handlers.
} finally {
CallDepthThreadLocalMap.reset(ChannelPipeline.class);
}
}
}
}

View File

@ -0,0 +1,61 @@
package datadog.trace.instrumentation.netty40.client;
import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Span;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
import java.util.Collections;
public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
if (!(msg instanceof HttpRequest)) {
ctx.write(msg, prm);
return;
}
final HttpRequest request = (HttpRequest) msg;
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String url = request.getUri();
if (request.headers().contains(HOST)) {
url = "http://" + request.headers().get(HOST) + url;
}
final Span span =
GlobalTracer.get()
.buildSpan("netty.client.request")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName())
.withTag(Tags.PEER_PORT.getKey(), remoteAddress.getPort())
.withTag(Tags.HTTP_METHOD.getKey(), request.getMethod().name())
.withTag(Tags.HTTP_URL.getKey(), url)
.withTag(Tags.COMPONENT.getKey(), "netty-client")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
.start();
GlobalTracer.get()
.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
ctx.channel().attr(HttpClientTracingHandler.attributeKey).set(span);
try {
ctx.write(msg, prm);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
}

View File

@ -0,0 +1,37 @@
package datadog.trace.instrumentation.netty40.client;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span span = ctx.channel().attr(HttpClientTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.fireChannelRead(msg);
return;
}
final HttpResponse response = (HttpResponse) msg;
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
Tags.HTTP_STATUS.set(span, response.getStatus().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}

View File

@ -0,0 +1,17 @@
package datadog.trace.instrumentation.netty40.client;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpClientTracingHandler
extends CombinedChannelDuplexHandler<
HttpClientResponseTracingHandler, HttpClientRequestTracingHandler> {
static final AttributeKey<Span> attributeKey =
new AttributeKey<>(HttpClientTracingHandler.class.getName());
public HttpClientTracingHandler() {
super(new HttpClientResponseTracingHandler(), new HttpClientRequestTracingHandler());
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.netty40.client;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
public class NettyResponseInjectAdapter implements TextMap {
private final HttpHeaders headers;
NettyResponseInjectAdapter(final HttpRequest request) {
this.headers = request.headers();
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
@Override
public void put(final String key, final String value) {
headers.set(key, value);
}
}

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.netty40.server;
import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
import java.util.Collections;
public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (!(msg instanceof HttpRequest)) {
ctx.fireChannelRead(msg); // superclass does not throw
return;
}
final HttpRequest request = (HttpRequest) msg;
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
final SpanContext extractedContext =
GlobalTracer.get()
.extract(Format.Builtin.HTTP_HEADERS, new NettyRequestExtractAdapter(request));
String url = request.getUri();
if (request.headers().contains(HOST)) {
url = "http://" + request.headers().get(HOST) + url;
}
final Scope scope =
GlobalTracer.get()
.buildSpan("netty.request")
.asChildOf(extractedContext)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
.withTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName())
.withTag(Tags.PEER_PORT.getKey(), remoteAddress.getPort())
.withTag(Tags.HTTP_METHOD.getKey(), request.getMethod().name())
.withTag(Tags.HTTP_URL.getKey(), url)
.withTag(Tags.COMPONENT.getKey(), "netty")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET)
.startActive(false);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final Span span = scope.span();
ctx.channel().attr(HttpServerTracingHandler.attributeKey).set(span);
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
} finally {
scope.close();
}
}
}

View File

@ -0,0 +1,38 @@
package datadog.trace.instrumentation.netty40.server;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
final Span span = ctx.channel().attr(HttpServerTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
return;
}
final HttpResponse response = (HttpResponse) msg;
try {
ctx.write(msg, prm);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
Tags.HTTP_STATUS.set(span, response.getStatus().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}

View File

@ -0,0 +1,17 @@
package datadog.trace.instrumentation.netty40.server;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpServerTracingHandler
extends CombinedChannelDuplexHandler<
HttpServerRequestTracingHandler, HttpServerResponseTracingHandler> {
static final AttributeKey<Span> attributeKey =
new AttributeKey<>(HttpServerTracingHandler.class.getName());
public HttpServerTracingHandler() {
super(new HttpServerRequestTracingHandler(), new HttpServerResponseTracingHandler());
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.netty40.server;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
public class NettyRequestExtractAdapter implements TextMap {
private final HttpHeaders headers;
NettyRequestExtractAdapter(final HttpRequest request) {
this.headers = request.headers();
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
return headers.iterator();
}
@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
}

View File

@ -0,0 +1,97 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncHttpClient
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.Request
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.AbstractHandler
import org.eclipse.jetty.util.MultiMap
import spock.lang.Shared
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
import static org.asynchttpclient.Dsl.asyncHttpClient
class Netty40ClientTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.netty.enabled", "true")
}
static final PORT = TestUtils.randomOpenPort()
@Shared
Server server = new Server(PORT)
@Shared
AsyncHttpClient asyncHttpClient = asyncHttpClient()
// DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(Integer.MAX_VALUE).build())
@Shared
def headers = new MultiMap()
def setupSpec() {
Handler handler = [
handle: { String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response ->
request.getHeaderNames().each {
headers.add(it, request.getHeader(it))
}
response.setContentType("text/plaincharset=utf-8")
response.setStatus(HttpServletResponse.SC_OK)
baseRequest.setHandled(true)
response.getWriter().println("Hello World")
}
] as AbstractHandler
server.setHandler(handler)
server.start()
}
def cleanupSpec() {
server.stop()
}
def cleanup() {
headers.clear()
}
def "test server request/response"() {
setup:
def responseFuture = asyncHttpClient.prepareGet("http://localhost:$PORT/").execute()
def response = responseFuture.get()
expect:
response.statusCode == 200
response.responseBody == "Hello World\n"
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.client.request"
resourceName "GET /"
spanType DDSpanTypes.HTTP_CLIENT
errored false
tags {
"$Tags.COMPONENT.key" "netty-client"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$PORT/"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
defaultTags()
}
}
}
}
and:
headers["x-datadog-trace-id"] == "${TEST_WRITER.get(0).get(0).traceId}"
headers["x-datadog-parent-id"] == "${TEST_WRITER.get(0).get(0).spanId}"
}
}

View File

@ -0,0 +1,164 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseEncoder
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.CharsetUtil
import io.opentracing.tag.Tags
import okhttp3.OkHttpClient
import okhttp3.Request
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class Netty40ServerTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.netty.enabled", "true")
}
OkHttpClient client = new OkHttpClient.Builder()
// Uncomment when debugging:
// .connectTimeout(1, TimeUnit.HOURS)
// .writeTimeout(1, TimeUnit.HOURS)
// .readTimeout(1, TimeUnit.HOURS)
.build()
def "test server request/response"() {
setup:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
int port = TestUtils.randomOpenPort()
initializeServer(eventLoopGroup, port, handlers, HttpResponseStatus.OK)
def request = new Request.Builder().url("http://localhost:$port/").get().build()
def response = client.newCall(request).execute()
expect:
response.code() == 200
response.body().string() == "Hello World"
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.request"
resourceName "GET /"
spanType DDSpanTypes.WEB_SERVLET
errored false
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$port/"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
defaultTags()
}
}
}
}
cleanup:
eventLoopGroup.shutdownGracefully()
where:
handlers | _
[new HttpServerCodec()] | _
[new HttpRequestDecoder(), new HttpResponseEncoder()] | _
}
def "test #responseCode response handling"() {
setup:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
int port = TestUtils.randomOpenPort()
initializeServer(eventLoopGroup, port, new HttpServerCodec(), responseCode)
def request = new Request.Builder().url("http://localhost:$port/").get().build()
def response = client.newCall(request).execute()
expect:
response.code() == responseCode.code()
response.body().string() == "Hello World"
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.request"
resourceName name
spanType DDSpanTypes.WEB_SERVLET
errored error
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" responseCode.code()
"$Tags.HTTP_URL.key" "http://localhost:$port/"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
if (error) {
tag("error", true)
}
defaultTags()
}
}
}
}
cleanup:
eventLoopGroup.shutdownGracefully()
where:
responseCode | name | error
HttpResponseStatus.OK | "GET /" | false
HttpResponseStatus.NOT_FOUND | "404" | false
HttpResponseStatus.INTERNAL_SERVER_ERROR | "GET /" | true
}
def initializeServer(eventLoopGroup, port, handlers, responseCode) {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler([
initChannel: { ch ->
ChannelPipeline pipeline = ch.pipeline()
handlers.each { pipeline.addLast(it) }
pipeline.addLast([
channelRead0 : { ctx, msg ->
if (msg instanceof LastHttpContent) {
ByteBuf content = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8)
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseCode, content)
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain")
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes())
ctx.write(response)
}
},
channelReadComplete: { it.flush() }
] as SimpleChannelInboundHandler)
}
] as ChannelInitializer).channel(NioServerSocketChannel)
bootstrap.bind(port).sync()
}
}

View File

@ -0,0 +1,61 @@
apply plugin: 'version-scan'
versionScan {
group = "io.netty"
module = "netty-all"
legacyModule = "netty"
versions = "[4.1.0.Final,)"
verifyPresent = [
"io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder": null,
]
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.netty', name: 'netty-all', version: '4.1.0.Final'
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile group: 'io.netty', name: 'netty-all', version: '4.1.0.Final'
testCompile group: 'org.eclipse.jetty', name: 'jetty-server', version: '8.2.0.v20160908'
testCompile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.6.0'
testCompile group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.31'
// async-http-client:2.0.32+ would require netty:4.1.9.Final
}
// We need to force the dependency to the earliest supported version because other libraries declare newer versions.
configurations.testCompile {
resolutionStrategy {
eachDependency { DependencyResolveDetails details ->
//specifying a fixed version for all libraries with io.netty' group
if (details.requested.group == 'io.netty') {
details.useVersion "4.1.0.Final"
}
}
}
}
configurations.latestDepTestCompile {
resolutionStrategy {
force group: 'io.netty', name: 'netty-all', version: '+'
force group: 'org.asynchttpclient', name: 'async-http-client', version: '+'
}
}
testJava8Only += '**/*Test.class'

View File

@ -0,0 +1,131 @@
package datadog.trace.instrumentation.netty41;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.instrumentation.netty41.client.HttpClientRequestTracingHandler;
import datadog.trace.instrumentation.netty41.client.HttpClientResponseTracingHandler;
import datadog.trace.instrumentation.netty41.client.HttpClientTracingHandler;
import datadog.trace.instrumentation.netty41.server.HttpServerRequestTracingHandler;
import datadog.trace.instrumentation.netty41.server.HttpServerResponseTracingHandler;
import datadog.trace.instrumentation.netty41.server.HttpServerTracingHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
@AutoService(Instrumenter.class)
public class NettyChannelPipelineInstrumentation extends Instrumenter.Configurable {
private static final String PACKAGE =
NettyChannelPipelineInstrumentation.class.getPackage().getName();
public NettyChannelPipelineInstrumentation() {
super("netty", "netty-4.1");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
public AgentBuilder apply(final AgentBuilder agentBuilder) {
return agentBuilder
.type(
not(isInterface()).and(hasSuperType(named("io.netty.channel.ChannelPipeline"))),
classLoaderHasClasses("io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder"))
.transform(
new HelperInjector(
// client helpers
PACKAGE + ".client.NettyResponseInjectAdapter",
PACKAGE + ".client.HttpClientRequestTracingHandler",
PACKAGE + ".client.HttpClientResponseTracingHandler",
PACKAGE + ".client.HttpClientTracingHandler",
// server helpers
PACKAGE + ".server.NettyRequestExtractAdapter",
PACKAGE + ".server.HttpServerRequestTracingHandler",
PACKAGE + ".server.HttpServerResponseTracingHandler",
PACKAGE + ".server.HttpServerTracingHandler"))
.transform(DDTransformers.defaultTransformers())
.transform(
DDAdvice.create()
.advice(
isMethod()
.and(nameStartsWith("add"))
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
ChannelPipelineAddAdvice.class.getName()))
.asDecorator();
}
/**
* When certain handlers are added to the pipeline, we want to add our corresponding tracing
* handlers. If those handlers are later removed, we may want to remove our handlers. That is not
* currently implemented.
*/
public static class ChannelPipelineAddAdvice {
@Advice.OnMethodEnter
public static int checkDepth() {
return CallDepthThreadLocalMap.incrementCallDepth(ChannelPipeline.class);
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void addHandler(
@Advice.Enter final int depth,
@Advice.This final ChannelPipeline pipeline,
@Advice.Argument(2) final ChannelHandler handler) {
if (depth > 0) return;
try {
// Server pipeline handlers
if (handler instanceof HttpServerCodec) {
pipeline.addLast(
HttpServerTracingHandler.class.getName(), new HttpServerTracingHandler());
} else if (handler instanceof HttpRequestDecoder) {
pipeline.addLast(
HttpServerRequestTracingHandler.class.getName(),
new HttpServerRequestTracingHandler());
} else if (handler instanceof HttpResponseEncoder) {
pipeline.addLast(
HttpServerResponseTracingHandler.class.getName(),
new HttpServerResponseTracingHandler());
} else
// Client pipeline handlers
if (handler instanceof HttpClientCodec) {
pipeline.addLast(
HttpClientTracingHandler.class.getName(), new HttpClientTracingHandler());
} else if (handler instanceof HttpRequestEncoder) {
pipeline.addLast(
HttpClientRequestTracingHandler.class.getName(),
new HttpClientRequestTracingHandler());
} else if (handler instanceof HttpResponseDecoder) {
pipeline.addLast(
HttpClientResponseTracingHandler.class.getName(),
new HttpClientResponseTracingHandler());
}
} catch (final IllegalArgumentException e) {
// Prevented adding duplicate handlers.
} finally {
CallDepthThreadLocalMap.reset(ChannelPipeline.class);
}
}
}
}

View File

@ -0,0 +1,61 @@
package datadog.trace.instrumentation.netty41.client;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Span;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
import java.util.Collections;
public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
if (!(msg instanceof HttpRequest)) {
ctx.write(msg, prm);
return;
}
final HttpRequest request = (HttpRequest) msg;
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String url = request.uri();
if (request.headers().contains(HOST)) {
url = "http://" + request.headers().get(HOST) + url;
}
final Span span =
GlobalTracer.get()
.buildSpan("netty.client.request")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName())
.withTag(Tags.PEER_PORT.getKey(), remoteAddress.getPort())
.withTag(Tags.HTTP_METHOD.getKey(), request.method().name())
.withTag(Tags.HTTP_URL.getKey(), url)
.withTag(Tags.COMPONENT.getKey(), "netty-client")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
.start();
GlobalTracer.get()
.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
ctx.channel().attr(HttpClientTracingHandler.attributeKey).set(span);
try {
ctx.write(msg, prm);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
}

View File

@ -0,0 +1,37 @@
package datadog.trace.instrumentation.netty41.client;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span span = ctx.channel().attr(HttpClientTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.fireChannelRead(msg);
return;
}
final HttpResponse response = (HttpResponse) msg;
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
Tags.HTTP_STATUS.set(span, response.status().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}

View File

@ -0,0 +1,17 @@
package datadog.trace.instrumentation.netty41.client;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpClientTracingHandler
extends CombinedChannelDuplexHandler<
HttpClientResponseTracingHandler, HttpClientRequestTracingHandler> {
static final AttributeKey<Span> attributeKey =
AttributeKey.valueOf(HttpClientTracingHandler.class.getName());
public HttpClientTracingHandler() {
super(new HttpClientResponseTracingHandler(), new HttpClientRequestTracingHandler());
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.netty41.client;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
public class NettyResponseInjectAdapter implements TextMap {
private final HttpHeaders headers;
NettyResponseInjectAdapter(final HttpRequest request) {
this.headers = request.headers();
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
@Override
public void put(final String key, final String value) {
headers.set(key, value);
}
}

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.netty41.server;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
import java.util.Collections;
public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (!(msg instanceof HttpRequest)) {
ctx.fireChannelRead(msg); // superclass does not throw
return;
}
final HttpRequest request = (HttpRequest) msg;
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
final SpanContext extractedContext =
GlobalTracer.get()
.extract(Format.Builtin.HTTP_HEADERS, new NettyRequestExtractAdapter(request));
String url = request.uri();
if (request.headers().contains(HOST)) {
url = "http://" + request.headers().get(HOST) + url;
}
final Scope scope =
GlobalTracer.get()
.buildSpan("netty.request")
.asChildOf(extractedContext)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
.withTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName())
.withTag(Tags.PEER_PORT.getKey(), remoteAddress.getPort())
.withTag(Tags.HTTP_METHOD.getKey(), request.method().name())
.withTag(Tags.HTTP_URL.getKey(), url)
.withTag(Tags.COMPONENT.getKey(), "netty")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET)
.startActive(false);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final Span span = scope.span();
ctx.channel().attr(HttpServerTracingHandler.attributeKey).set(span);
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
} finally {
scope.close();
}
}
}

View File

@ -0,0 +1,38 @@
package datadog.trace.instrumentation.netty41.server;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
final Span span = ctx.channel().attr(HttpServerTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
return;
}
final HttpResponse response = (HttpResponse) msg;
try {
ctx.write(msg, prm);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
Tags.HTTP_STATUS.set(span, response.status().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}

View File

@ -0,0 +1,17 @@
package datadog.trace.instrumentation.netty41.server;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;
public class HttpServerTracingHandler
extends CombinedChannelDuplexHandler<
HttpServerRequestTracingHandler, HttpServerResponseTracingHandler> {
static final AttributeKey<Span> attributeKey =
AttributeKey.valueOf(HttpServerTracingHandler.class.getName());
public HttpServerTracingHandler() {
super(new HttpServerRequestTracingHandler(), new HttpServerResponseTracingHandler());
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.netty41.server;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
public class NettyRequestExtractAdapter implements TextMap {
private final HttpHeaders headers;
NettyRequestExtractAdapter(final HttpRequest request) {
this.headers = request.headers();
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
return headers.iteratorAsString();
}
@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
}

View File

@ -0,0 +1,97 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncHttpClient
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.Request
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.AbstractHandler
import org.eclipse.jetty.util.MultiMap
import spock.lang.Shared
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
import static org.asynchttpclient.Dsl.asyncHttpClient
class Netty41ClientTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.netty.enabled", "true")
}
static final PORT = TestUtils.randomOpenPort()
@Shared
Server server = new Server(PORT)
@Shared
AsyncHttpClient asyncHttpClient = asyncHttpClient()
// DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(Integer.MAX_VALUE).build())
@Shared
def headers = new MultiMap()
def setupSpec() {
Handler handler = [
handle: { String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response ->
request.getHeaderNames().each {
headers.add(it, request.getHeader(it))
}
response.setContentType("text/plaincharset=utf-8")
response.setStatus(HttpServletResponse.SC_OK)
baseRequest.setHandled(true)
response.getWriter().println("Hello World")
}
] as AbstractHandler
server.setHandler(handler)
server.start()
}
def cleanupSpec() {
server.stop()
}
def cleanup() {
headers.clear()
}
def "test server request/response"() {
setup:
def responseFuture = asyncHttpClient.prepareGet("http://localhost:$PORT/").execute()
def response = responseFuture.get()
expect:
response.statusCode == 200
response.responseBody == "Hello World\n"
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.client.request"
resourceName "GET /"
spanType DDSpanTypes.HTTP_CLIENT
errored false
tags {
"$Tags.COMPONENT.key" "netty-client"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$PORT/"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
defaultTags()
}
}
}
}
and:
headers["x-datadog-trace-id"] == "${TEST_WRITER.get(0).get(0).traceId}"
headers["x-datadog-parent-id"] == "${TEST_WRITER.get(0).get(0).spanId}"
}
}

View File

@ -0,0 +1,164 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseEncoder
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.CharsetUtil
import io.opentracing.tag.Tags
import okhttp3.OkHttpClient
import okhttp3.Request
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class Netty41ServerTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.netty.enabled", "true")
}
OkHttpClient client = new OkHttpClient.Builder()
// Uncomment when debugging:
// .connectTimeout(1, TimeUnit.HOURS)
// .writeTimeout(1, TimeUnit.HOURS)
// .readTimeout(1, TimeUnit.HOURS)
.build()
def "test server request/response"() {
setup:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
int port = TestUtils.randomOpenPort()
initializeServer(eventLoopGroup, port, handlers, HttpResponseStatus.OK)
def request = new Request.Builder().url("http://localhost:$port/").get().build()
def response = client.newCall(request).execute()
expect:
response.code() == 200
response.body().string() == "Hello World"
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.request"
resourceName "GET /"
spanType DDSpanTypes.WEB_SERVLET
errored false
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$port/"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
defaultTags()
}
}
}
}
cleanup:
eventLoopGroup.shutdownGracefully()
where:
handlers | _
[new HttpServerCodec()] | _
[new HttpRequestDecoder(), new HttpResponseEncoder()] | _
}
def "test #responseCode response handling"() {
setup:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
int port = TestUtils.randomOpenPort()
initializeServer(eventLoopGroup, port, new HttpServerCodec(), responseCode)
def request = new Request.Builder().url("http://localhost:$port/").get().build()
def response = client.newCall(request).execute()
expect:
response.code() == responseCode.code()
response.body().string() == "Hello World"
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "netty.request"
resourceName name
spanType DDSpanTypes.WEB_SERVLET
errored error
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" responseCode.code()
"$Tags.HTTP_URL.key" "http://localhost:$port/"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
if (error) {
tag("error", true)
}
defaultTags()
}
}
}
}
cleanup:
eventLoopGroup.shutdownGracefully()
where:
responseCode | name | error
HttpResponseStatus.OK | "GET /" | false
HttpResponseStatus.NOT_FOUND | "404" | false
HttpResponseStatus.INTERNAL_SERVER_ERROR | "GET /" | true
}
def initializeServer(eventLoopGroup, port, handlers, responseCode) {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler([
initChannel: { ch ->
ChannelPipeline pipeline = ch.pipeline()
handlers.each { pipeline.addLast(it) }
pipeline.addLast([
channelRead0 : { ctx, msg ->
if (msg instanceof LastHttpContent) {
ByteBuf content = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8)
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseCode, content)
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain")
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
ctx.write(response)
}
},
channelReadComplete: { it.flush() }
] as SimpleChannelInboundHandler)
}
] as ChannelInitializer).channel(NioServerSocketChannel)
bootstrap.bind(port).sync()
}
}

View File

@ -115,7 +115,7 @@ public final class HttpServlet3Instrumentation extends Instrumenter.Configurable
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
scope.close();
scope.span().finish(); // Finish the span manually since finishSpanOnClose was false
span.finish(); // Finish the span manually since finishSpanOnClose was false
} else if (req.isAsyncStarted()) {
final AtomicBoolean activated = new AtomicBoolean(false);
// what if async is already finished? This would not be called
@ -124,7 +124,7 @@ public final class HttpServlet3Instrumentation extends Instrumenter.Configurable
} else {
Tags.HTTP_STATUS.set(span, resp.getStatus());
scope.close();
scope.span().finish(); // Finish the span manually since finishSpanOnClose was false
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
}
@ -141,7 +141,7 @@ public final class HttpServlet3Instrumentation extends Instrumenter.Configurable
@Override
public void onComplete(final AsyncEvent event) throws IOException {
if (activated.compareAndSet(false, true)) {
try (Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
Tags.HTTP_STATUS.set(
span, ((HttpServletResponse) event.getSuppliedResponse()).getStatus());
}
@ -151,7 +151,7 @@ public final class HttpServlet3Instrumentation extends Instrumenter.Configurable
@Override
public void onTimeout(final AsyncEvent event) throws IOException {
if (activated.compareAndSet(false, true)) {
try (Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
Tags.ERROR.set(span, Boolean.TRUE);
span.setTag("timeout", event.getAsyncContext().getTimeout());
}
@ -161,7 +161,7 @@ public final class HttpServlet3Instrumentation extends Instrumenter.Configurable
@Override
public void onError(final AsyncEvent event) throws IOException {
if (event.getThrowable() != null && activated.compareAndSet(false, true)) {
try (Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, true)) {
if (((HttpServletResponse) event.getSuppliedResponse()).getStatus()
== HttpServletResponse.SC_OK) {
// exception is thrown in filter chain, but status code is incorrect

View File

@ -43,19 +43,22 @@ class TagsAssert {
}
}
def tag(String name, value) {
assertedTags.add(name)
if (value instanceof Class) {
assert ((Class) value).isInstance(tags[name])
} else if (value instanceof Closure) {
assert ((Closure) value).call(tags[name])
} else {
assert tags[name] == value
}
}
def methodMissing(String name, args) {
if (args.length > 1) {
if (args.length != 1) {
throw new IllegalArgumentException(args.toString())
}
assertedTags.add(name)
def arg = args[0]
if (arg instanceof Class) {
assert ((Class) arg).isInstance(tags[name])
} else if (arg instanceof Closure) {
assert ((Closure) arg).call(tags[name])
} else {
assert tags[name] == arg
}
tag(name, args[0])
}
void assertTagsAllVerified() {

View File

@ -50,8 +50,10 @@ dependencies {
testCompile deps.groovy
testCompile deps.testLogging
testCompile 'info.solidsoft.spock:spock-global-unroll:0.5.1'
testCompile group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6'
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.17.1'
if (!project.name.contains("netty")) { // avoid screwing up the classpath...
testCompile group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6'
}
}
tasks.withType(Javadoc) {

View File

@ -37,6 +37,8 @@ include ':dd-java-agent:instrumentation:kafka-streams-0.11'
include ':dd-java-agent:instrumentation:lettuce-5'
include ':dd-java-agent:instrumentation:mongo-3.1'
include ':dd-java-agent:instrumentation:mongo-async-3.3'
include ':dd-java-agent:instrumentation:netty-4.0'
include ':dd-java-agent:instrumentation:netty-4.1'
include ':dd-java-agent:instrumentation:okhttp-3'
include ':dd-java-agent:instrumentation:osgi-classloading'
include ':dd-java-agent:instrumentation:play-2.4'