From 2b25de966a9891f635b8c9d574901e6064a96136 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Tue, 26 Jun 2018 16:10:52 -0400 Subject: [PATCH] Add akka-http-client instrumentation This is a very-very first pass: instrument single request --- .../akka-http-10.0/akka-http-10.0.gradle | 3 + .../AkkaHttpClientInstrumentation.java | 140 ++++++++++++++++ .../AkkaHttpClientInstrumentationTest.groovy | 153 ++++++++++++++++++ 3 files changed, 296 insertions(+) create mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java create mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy diff --git a/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle b/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle index e10a43321c..a4cc0777eb 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle +++ b/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle @@ -41,3 +41,6 @@ dependencies { test.dependsOn lagomTest testJava8Minimum += '*Test*.class' + +// These classes use Ratpack which requires Java 8. (Currently also incompatible with Java 9.) +testJava8Only += '**/AkkaHttpClientInstrumentationTest.class' diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java new file mode 100644 index 0000000000..104c05ac5b --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java @@ -0,0 +1,140 @@ +package datadog.trace.instrumentation.akkahttp; + +import static io.opentracing.log.Fields.ERROR_OBJECT; +import static net.bytebuddy.matcher.ElementMatchers.*; + +import akka.http.javadsl.model.headers.RawHeader; +import akka.http.scaladsl.HttpExt; +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.stream.*; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.*; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMap; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; +import scala.concurrent.Future; +import scala.runtime.AbstractFunction1; +import scala.util.Try; + +@Slf4j +@AutoService(Instrumenter.class) +public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurable { + public AkkaHttpClientInstrumentation() { + super("akka-http", "akka-http-client"); + } + + @Override + protected boolean defaultEnabled() { + return false; + } + + private static final HelperInjector HELPER_INJECTOR = + new HelperInjector( + AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler", + AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders"); + + @Override + public AgentBuilder apply(final AgentBuilder agentBuilder) { + return agentBuilder + .type(named("akka.http.scaladsl.HttpExt")) + .transform(DDTransformers.defaultTransformers()) + .transform(HELPER_INJECTOR) + .transform( + DDAdvice.create() + .advice( + named("singleRequest") + .and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))), + AkkaHttpClientAdvice.class.getName())) + .asDecorator(); + } + + public static class AkkaHttpClientAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Scope methodEnter( + @Advice.Argument(value = 0, readOnly = false) HttpRequest request) { + Scope scope = + GlobalTracer.get() + .buildSpan("akka-http.request") + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .withTag(Tags.HTTP_METHOD.getKey(), request.method().value()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT) + .withTag(Tags.COMPONENT.getKey(), "akka-http-client") + .withTag(Tags.HTTP_URL.getKey(), request.getUri().toString()) + .startActive(false); + + AkkaHttpHeaders headers = new AkkaHttpHeaders(request); + GlobalTracer.get().inject(scope.span().context(), Format.Builtin.HTTP_HEADERS, headers); + // Request is immutable, so we have to assign new value once we update headers + request = headers.getRequest(); + + return scope; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void methodExit( + @Advice.Argument(value = 0) final HttpRequest request, + @Advice.This final HttpExt thiz, + @Advice.Return final Future responseFuture, + @Advice.Enter final Scope scope) { + responseFuture.onComplete(new OnCompleteHandler(scope), thiz.system().dispatcher()); + scope.close(); + } + } + + public static class OnCompleteHandler extends AbstractFunction1, Void> { + private final Scope scope; + + public OnCompleteHandler(Scope scope) { + this.scope = scope; + } + + @Override + public Void apply(Try result) { + Span span = scope.span(); + if (result.isSuccess()) { + Tags.HTTP_STATUS.set(span, result.get().status().intValue()); + } else { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, result.failed().get())); + } + span.finish(); + return null; + } + } + + public static class AkkaHttpHeaders implements TextMap { + private HttpRequest request; + + public AkkaHttpHeaders(HttpRequest request) { + this.request = request; + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException( + "This class should be used only with Tracer.inject()!"); + } + + @Override + public void put(final String name, final String value) { + // It looks like this cast is only needed in Java, Scala would have figured it out + request = (HttpRequest) request.addHeader(RawHeader.create(name, value)); + } + + public HttpRequest getRequest() { + return request; + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy new file mode 100644 index 0000000000..d249cf7059 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy @@ -0,0 +1,153 @@ +import akka.actor.ActorSystem +import akka.http.javadsl.Http +import akka.http.javadsl.model.HttpRequest +import akka.http.javadsl.model.HttpResponse +import akka.stream.ActorMaterializer +import akka.stream.StreamTcpException +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.RatpackUtils +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import io.opentracing.tag.Tags +import spock.lang.Shared + +import java.util.concurrent.CompletionStage +import java.util.concurrent.ExecutionException + +import static datadog.trace.agent.test.ListWriterAssert.assertTraces +import static ratpack.groovy.test.embed.GroovyEmbeddedApp.ratpack + +class AkkaHttpClientInstrumentationTest extends AgentTestRunner { + static { + System.setProperty("dd.integration.akka-http-client.enabled", "true") + } + + private static final String MESSAGE = "an\nmultiline\nhttp\nresponse" + private static final long TIMEOUT = 10000L + + @Shared + def server = ratpack { + handlers { + prefix("success") { + all { + RatpackUtils.handleDistributedRequest(context) + + response.status(200).send(MESSAGE) + } + } + + prefix("error") { + all { + RatpackUtils.handleDistributedRequest(context) + + throw new RuntimeException("error") + } + } + } + } + + @Shared + ActorSystem system = ActorSystem.create() + @Shared + ActorMaterializer materializer = ActorMaterializer.create(system) + + def "#route request trace" () { + setup: + def url = server.address.resolve("/" + route).toURL() + + HttpRequest request = HttpRequest.create(url.toString()) + CompletionStage responseFuture = + Http.get(system) + .singleRequest(request, materializer) + HttpResponse response = responseFuture.toCompletableFuture().get() + String message = readMessage(response) + + expect: + response.status().intValue() == expectedStatus + if (expectedMessage != null) { + message == expectedMessage + } + + assertTraces(TEST_WRITER, 2) { + trace(0, 1) { + span(0) { + operationName "test-http-server" + childOf(TEST_WRITER[1][0]) + errored false + tags { + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + parent() + serviceName "unnamed-java-app" + operationName "akka-http.request" + resourceName "GET /$route" + errored expectedError + tags { + defaultTags() + "$Tags.HTTP_STATUS.key" expectedStatus + "$Tags.HTTP_URL.key" "${server.address}$route" + "$Tags.HTTP_METHOD.key" "GET" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT + "$Tags.COMPONENT.key" "akka-http-client" + if (expectedError) { + "$Tags.ERROR.key" true + } + } + } + } + } + + where: + route | expectedStatus | expectedError | expectedMessage + "success" | 200 | false | MESSAGE + "error" | 500 | true | null + } + + def "error request trace" () { + setup: + def url = new URL("http://localhost:${server.address.port + 1}/test") + + HttpRequest request = HttpRequest.create(url.toString()) + CompletionStage responseFuture = + Http.get(system) + .singleRequest(request, materializer) + try { + responseFuture.toCompletableFuture().get() + } catch (ExecutionException e) { + // This is expected to fail + } + + expect: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + parent() + serviceName "unnamed-java-app" + operationName "akka-http.request" + resourceName "GET /test" + errored true + tags { + defaultTags() + "$Tags.HTTP_URL.key" url.toString() + "$Tags.HTTP_METHOD.key" "GET" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT + "$Tags.COMPONENT.key" "akka-http-client" + "$Tags.ERROR.key" true + errorTags(StreamTcpException, { it.contains("Tcp command") }) + } + } + } + } + } + + String readMessage(HttpResponse response) { + response.entity().toStrict(TIMEOUT, materializer).toCompletableFuture().get().getData().utf8String() + } + +}