From ae37ca4b0229852f96aea97a7e00e9056e5b37e0 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Fri, 29 Jun 2018 16:20:32 -0400 Subject: [PATCH] Add akka-http-client instrumentation: superPool --- .../akka-http-10.0/akka-http-10.0.gradle | 3 + .../AkkaHttpClientInstrumentation.java | 25 +++- .../AkkaHttpClientTransformFlow.scala | 48 ++++++++ .../AkkaHttpClientInstrumentationTest.groovy | 116 +++++++++++++++++- 4 files changed, 182 insertions(+), 10 deletions(-) rename dd-java-agent/instrumentation/akka-http-10.0/src/main/{java => scala}/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java (85%) create mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala diff --git a/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle b/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle index a4cc0777eb..cf3358daeb 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle +++ b/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle @@ -1,6 +1,9 @@ apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/test-with-scala.gradle" +// We have actual Scala sources here +apply plugin: 'scala' + apply plugin: 'org.unbroken-dome.test-sets' testSets { lagomTest diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java similarity index 85% rename from dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java rename to dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java index 104c05ac5b..adf42c0b00 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java @@ -3,11 +3,12 @@ package datadog.trace.instrumentation.akkahttp; import static io.opentracing.log.Fields.ERROR_OBJECT; import static net.bytebuddy.matcher.ElementMatchers.*; +import akka.NotUsed; import akka.http.javadsl.model.headers.RawHeader; import akka.http.scaladsl.HttpExt; import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; -import akka.stream.*; +import akka.stream.scaladsl.Flow; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.*; import datadog.trace.api.DDSpanTypes; @@ -24,6 +25,7 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import net.bytebuddy.agent.builder.AgentBuilder; import net.bytebuddy.asm.Advice; +import scala.Tuple2; import scala.concurrent.Future; import scala.runtime.AbstractFunction1; import scala.util.Try; @@ -43,7 +45,8 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurab private static final HelperInjector HELPER_INJECTOR = new HelperInjector( AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler", - AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders"); + AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders", + AkkaHttpClientTransformFlow.class.getName()); @Override public AgentBuilder apply(final AgentBuilder agentBuilder) { @@ -56,11 +59,16 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurab .advice( named("singleRequest") .and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))), - AkkaHttpClientAdvice.class.getName())) + SingleRequesrAdvice.class.getName())) + .transform( + DDAdvice.create() + .advice( + named("superPool").and(returns(named("akka.stream.scaladsl.Flow"))), + SuperPoolAdvice.class.getName())) .asDecorator(); } - public static class AkkaHttpClientAdvice { + public static class SingleRequesrAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope methodEnter( @Advice.Argument(value = 0, readOnly = false) HttpRequest request) { @@ -93,6 +101,15 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurab } } + public static class SuperPoolAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void methodExit( + @Advice.Return(readOnly = false) + Flow, Tuple2, T>, NotUsed> flow) { + flow = AkkaHttpClientTransformFlow.transform(flow); + } + } + public static class OnCompleteHandler extends AbstractFunction1, Void> { private final Scope scope; diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala new file mode 100644 index 0000000000..447624cec1 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala @@ -0,0 +1,48 @@ +package datadog.trace.instrumentation.akkahttp + +import java.util.Collections + +import akka.NotUsed +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.stream.Supervision +import akka.stream.scaladsl.Flow +import datadog.trace.api.{DDSpanTypes, DDTags} +import io.opentracing.log.Fields.ERROR_OBJECT +import io.opentracing.{Scope, Span} +import io.opentracing.propagation.Format +import io.opentracing.tag.Tags +import io.opentracing.util.GlobalTracer + +import scala.util.{Failure, Success, Try} + +object AkkaHttpClientTransformFlow { + def transform[T](flow: Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = { + var span: Span = null + + Flow.fromFunction((input: (HttpRequest, T)) => { + val (request, data) = input + val scope = GlobalTracer.get + .buildSpan("akka-http.request") + .withTag(Tags.SPAN_KIND.getKey, Tags.SPAN_KIND_CLIENT) + .withTag(Tags.HTTP_METHOD.getKey, request.method.value) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT) + .withTag(Tags.COMPONENT.getKey, "akka-http-client") + .withTag(Tags.HTTP_URL.getKey, request.getUri.toString) + .startActive(false) + val headers = new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request) + GlobalTracer.get.inject(scope.span.context, Format.Builtin.HTTP_HEADERS, headers) + span = scope.span + scope.close() + (headers.getRequest, data) + }).via(flow).map(output => { + output._1 match { + case Success(response) => Tags.HTTP_STATUS.set(span, response.status.intValue) + case Failure(e) => + Tags.ERROR.set(span, true) + span.log(Collections.singletonMap(ERROR_OBJECT, e)) + } + span.finish() + output + }) + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy index d249cf7059..dbb5950507 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy @@ -2,13 +2,17 @@ import akka.actor.ActorSystem import akka.http.javadsl.Http import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpResponse +import akka.japi.Pair import akka.stream.ActorMaterializer import akka.stream.StreamTcpException +import akka.stream.javadsl.Sink +import akka.stream.javadsl.Source import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.RatpackUtils import datadog.trace.api.DDSpanTypes import datadog.trace.api.DDTags import io.opentracing.tag.Tags +import scala.util.Try import spock.lang.Shared import java.util.concurrent.CompletionStage @@ -51,6 +55,8 @@ class AkkaHttpClientInstrumentationTest extends AgentTestRunner { @Shared ActorMaterializer materializer = ActorMaterializer.create(system) + def pool = Http.get(system).superPool(materializer) + def "#route request trace" () { setup: def url = server.address.resolve("/" + route).toURL() @@ -59,10 +65,12 @@ class AkkaHttpClientInstrumentationTest extends AgentTestRunner { CompletionStage responseFuture = Http.get(system) .singleRequest(request, materializer) + + when: HttpResponse response = responseFuture.toCompletableFuture().get() String message = readMessage(response) - expect: + then: response.status().intValue() == expectedStatus if (expectedMessage != null) { message == expectedMessage @@ -116,13 +124,109 @@ class AkkaHttpClientInstrumentationTest extends AgentTestRunner { CompletionStage responseFuture = Http.get(system) .singleRequest(request, materializer) - try { - responseFuture.toCompletableFuture().get() - } catch (ExecutionException e) { - // This is expected to fail + + when: + responseFuture.toCompletableFuture().get() + + then: + thrown ExecutionException + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + parent() + serviceName "unnamed-java-app" + operationName "akka-http.request" + resourceName "GET /test" + errored true + tags { + defaultTags() + "$Tags.HTTP_URL.key" url.toString() + "$Tags.HTTP_METHOD.key" "GET" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT + "$Tags.COMPONENT.key" "akka-http-client" + "$Tags.ERROR.key" true + errorTags(StreamTcpException, { it.contains("Tcp command") }) + } + } + } + } + } + + def "#route pool request trace" () { + setup: + def url = server.address.resolve("/" + route).toURL() + + CompletionStage, Integer>> sink = Source + .>single(new Pair(HttpRequest.create(url.toString()), 1)) + .via(pool) + .runWith(Sink., Integer>>head(), materializer) + + when: + HttpResponse response = sink.toCompletableFuture().get().first().get() + String message = readMessage(response) + + then: + response.status().intValue() == expectedStatus + if (expectedMessage != null) { + message == expectedMessage } - expect: + assertTraces(TEST_WRITER, 2) { + trace(0, 1) { + span(0) { + operationName "test-http-server" + childOf(TEST_WRITER[1][0]) + errored false + tags { + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + parent() + serviceName "unnamed-java-app" + operationName "akka-http.request" + resourceName "GET /$route" + errored expectedError + tags { + defaultTags() + "$Tags.HTTP_STATUS.key" expectedStatus + "$Tags.HTTP_URL.key" "${server.address}$route" + "$Tags.HTTP_METHOD.key" "GET" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT + "$Tags.COMPONENT.key" "akka-http-client" + if (expectedError) { + "$Tags.ERROR.key" true + } + } + } + } + } + + where: + route | expectedStatus | expectedError | expectedMessage + "success" | 200 | false | MESSAGE + "error" | 500 | true | null + } + + def "error request pool trace" () { + setup: + def url = new URL("http://localhost:${server.address.port + 1}/test") + + CompletionStage, Integer>> sink = Source + .>single(new Pair(HttpRequest.create(url.toString()), 1)) + .via(pool) + .runWith(Sink., Integer>>head(), materializer) + def response = sink.toCompletableFuture().get().first() + + when: + response.get() + + then: + thrown StreamTcpException assertTraces(TEST_WRITER, 1) { trace(0, 1) { span(0) {