From bf7bbf31f806e8b3b150252083b4e3056291bf9f Mon Sep 17 00:00:00 2001 From: David Pratt Date: Fri, 21 Jun 2019 14:07:37 -0500 Subject: [PATCH 1/2] Fix akka-http instrumentation. Remove compiled Scala artifacts from the actual instrumentation. Scala is not binary compatible across major versions, and having AkkaHttpClientTransformFlow.scala be in the artifact causes problems when using anything but Scala 2.11. Having the AkkaHttpClientTransformFlow implementation be in pure java utilizing the Akka-stream Java DSL ensures that this will work across any given Scala major version. --- .../akka-http-10.0/akka-http-10.0.gradle | 8 +++ .../AkkaHttpClientInstrumentation.java | 3 - .../akkahttp/AkkaHttpClientTransformFlow.java | 61 +++++++++++++++++++ .../AkkaHttpClientTransformFlow.scala | 35 ----------- 4 files changed, 69 insertions(+), 38 deletions(-) create mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java delete mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala diff --git a/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle b/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle index 1766a6dff4..da3d24b604 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle +++ b/dd-java-agent/instrumentation/akka-http-10.0/akka-http-10.0.gradle @@ -59,6 +59,14 @@ muzzle { // later versions of akka-http expect streams to be provided extraDependency 'com.typesafe.akka:akka-stream_2.12:2.5.11' } + //There is no akka-http 10.0.x series for scala 2.13 + pass { + group = 'com.typesafe.akka' + module = 'akka-http_2.13' + versions = "[10.1.8,)" + // later versions of akka-http expect streams to be provided + extraDependency 'com.typesafe.akka:akka-stream_2.13:2.5.23' + } } dependencies { diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java index 71a7a5dafa..95157bd530 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java @@ -50,9 +50,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default { AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler", AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders", packageName + ".AkkaHttpClientTransformFlow", - packageName + ".AkkaHttpClientTransformFlow$", - packageName + ".AkkaHttpClientTransformFlow$$anonfun$transform$1", - packageName + ".AkkaHttpClientTransformFlow$$anonfun$transform$2", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.HttpClientDecorator", diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java new file mode 100644 index 0000000000..ec2a01389d --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java @@ -0,0 +1,61 @@ +package datadog.trace.instrumentation.akkahttp; + +import akka.NotUsed; +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.japi.function.Function; +import akka.stream.scaladsl.Flow; +import io.opentracing.Span; +import io.opentracing.propagation.Format; +import io.opentracing.util.GlobalTracer; +import java.util.concurrent.atomic.AtomicReference; +import scala.Tuple2; +import scala.util.Try; + +public class AkkaHttpClientTransformFlow { + + public static Flow, Tuple2, T>, NotUsed> transform( + Flow, Tuple2, T>, NotUsed> flow) { + + final AtomicReference spanRef = new AtomicReference<>(null); + + return akka.stream.javadsl.Flow.fromFunction( + new Function, Tuple2>() { + @Override + public Tuple2 apply(Tuple2 param) throws Exception { + HttpRequest request = param._1; + T data = param._2; + + Span span = GlobalTracer.get().buildSpan("akka-http.request").start(); + spanRef.set(span); + + AkkaHttpClientDecorator.DECORATE.afterStart(span); + AkkaHttpClientDecorator.DECORATE.onRequest(span, request); + + AkkaHttpClientInstrumentation.AkkaHttpHeaders headers = + new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request); + GlobalTracer.get().inject(span.context(), Format.Builtin.HTTP_HEADERS, headers); + + return new Tuple2<>(headers.getRequest(), data); + } + }) + .via(flow) + .map( + new Function, T>, Tuple2, T>>() { + @Override + public Tuple2, T> apply(Tuple2, T> param) + throws Exception { + Span span = spanRef.get(); + try { + AkkaHttpClientDecorator.DECORATE.onResponse(span, param._1.get()); + } catch (Throwable t) { + AkkaHttpClientDecorator.DECORATE.onError(span, t); + } + AkkaHttpClientDecorator.DECORATE.beforeFinish(span); + span.finish(); + return param; + } + }) + .asScala(); + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala deleted file mode 100644 index f8bc628007..0000000000 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.scala +++ /dev/null @@ -1,35 +0,0 @@ -package datadog.trace.instrumentation.akkahttp - -import akka.NotUsed -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} -import akka.stream.scaladsl.Flow -import datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE -import io.opentracing.Span -import io.opentracing.propagation.Format -import io.opentracing.util.GlobalTracer - -import scala.util.{Failure, Success, Try} - -object AkkaHttpClientTransformFlow { - def transform[T](flow: Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = { - var span: Span = null - - Flow.fromFunction((input: (HttpRequest, T)) => { - val (request, data) = input - span = GlobalTracer.get.buildSpan("akka-http.request").start() - DECORATE.afterStart(span) - DECORATE.onRequest(span, request) - val headers = new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request) - GlobalTracer.get.inject(span.context, Format.Builtin.HTTP_HEADERS, headers) - (headers.getRequest, data) - }).via(flow).map(output => { - output._1 match { - case Success(response) => DECORATE.onResponse(span, response) - case Failure(e) => DECORATE.onError(span, e) - } - DECORATE.beforeFinish(span) - span.finish() - output - }) - } -} From 681420a004636425b57d4a10f2e97da261fba5a7 Mon Sep 17 00:00:00 2001 From: David Pratt Date: Mon, 24 Jun 2019 10:36:32 -0500 Subject: [PATCH 2/2] Remove SuperPool instrumentation. By definition, a Flow generated by a SuperPool does not respect ordering of requests and responses, and in fact will typically only rarely actually behave in the fashion that the instrumentation expects. The previous implementation would start a span for a given request before submitting it as input to the flow, and close the span with whatever response is next emitted by the flow. This request will rarely (if ever) be the actual response for the request that started the span. For more info, see the official docs at https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#configuring-a-host-connection-pool Additionally, compiling this instumentation against scala 2.11, and only scala 2.11 can (and does) cause significant problems at runtime due to the fact that Scala is explicitly not binary compatible across major versions. --- .../akkahttp/AkkaHttpClientDecorator.java | 0 .../AkkaHttpClientInstrumentation.java | 42 ------------- .../akkahttp/AkkaHttpServerDecorator.java | 0 .../AkkaHttpServerInstrumentation.java | 0 .../akkahttp/AkkaHttpClientTransformFlow.java | 61 ------------------- ...kaHttpClientPoolInstrumentationTest.groovy | 53 ---------------- 6 files changed, 156 deletions(-) rename dd-java-agent/instrumentation/akka-http-10.0/src/main/{scala => java}/datadog/trace/instrumentation/akkahttp/AkkaHttpClientDecorator.java (100%) rename dd-java-agent/instrumentation/akka-http-10.0/src/main/{scala => java}/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java (78%) rename dd-java-agent/instrumentation/akka-http-10.0/src/main/{scala => java}/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java (100%) rename dd-java-agent/instrumentation/akka-http-10.0/src/main/{scala => java}/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java (100%) delete mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java delete mode 100644 dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientPoolInstrumentationTest.groovy diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientDecorator.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientDecorator.java similarity index 100% rename from dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientDecorator.java rename to dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientDecorator.java diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java similarity index 78% rename from dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java rename to dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java index 95157bd530..2c9c711671 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpClientInstrumentation.java @@ -2,15 +2,12 @@ package datadog.trace.instrumentation.akkahttp; import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import akka.NotUsed; import akka.http.javadsl.model.headers.RawHeader; import akka.http.scaladsl.HttpExt; import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; -import akka.stream.scaladsl.Flow; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.CallDepthThreadLocalMap; @@ -27,7 +24,6 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import scala.Tuple2; import scala.concurrent.Future; import scala.runtime.AbstractFunction1; import scala.util.Try; @@ -49,7 +45,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default { return new String[] { AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler", AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders", - packageName + ".AkkaHttpClientTransformFlow", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.HttpClientDecorator", @@ -69,14 +64,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default { named("singleRequestImpl") .and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))), SingleRequestAdvice.class.getName()); - // This is mainly for compatibility with 10.0 - transformers.put( - named("superPool").and(returns(named("akka.stream.scaladsl.Flow"))), - SuperPoolAdvice.class.getName()); - // This is for 10.1+ - transformers.put( - named("superPoolImpl").and(returns(named("akka.stream.scaladsl.Flow"))), - SuperPoolAdvice.class.getName()); return transformers; } @@ -105,7 +92,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default { // Request is immutable, so we have to assign new value once we update headers request = headers.getRequest(); } - return scope; } @@ -134,34 +120,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default { } } - public static class SuperPoolAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static boolean methodEnter() { - /* - Versions 10.0 and 10.1 have slightly different structure that is hard to distinguish so here - we cast 'wider net' and avoid instrumenting twice. - In the future we may want to separate these, but since lots of code is reused we would need to come up - with way of continuing to reusing it. - */ - final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(HttpExt.class); - return callDepth <= 0; - } - - @Advice.OnMethodExit(suppress = Throwable.class) - public static void methodExit( - @Advice.Return(readOnly = false) - Flow, Tuple2, T>, NotUsed> flow, - @Advice.Enter final boolean isApplied) { - if (!isApplied) { - return; - } - CallDepthThreadLocalMap.reset(HttpExt.class); - - flow = AkkaHttpClientTransformFlow.transform(flow); - } - } - public static class OnCompleteHandler extends AbstractFunction1, Void> { private final Span span; diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java similarity index 100% rename from dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java rename to dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java similarity index 100% rename from dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java rename to dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java deleted file mode 100644 index ec2a01389d..0000000000 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/scala/datadog/trace/instrumentation/akkahttp/AkkaHttpClientTransformFlow.java +++ /dev/null @@ -1,61 +0,0 @@ -package datadog.trace.instrumentation.akkahttp; - -import akka.NotUsed; -import akka.http.scaladsl.model.HttpRequest; -import akka.http.scaladsl.model.HttpResponse; -import akka.japi.function.Function; -import akka.stream.scaladsl.Flow; -import io.opentracing.Span; -import io.opentracing.propagation.Format; -import io.opentracing.util.GlobalTracer; -import java.util.concurrent.atomic.AtomicReference; -import scala.Tuple2; -import scala.util.Try; - -public class AkkaHttpClientTransformFlow { - - public static Flow, Tuple2, T>, NotUsed> transform( - Flow, Tuple2, T>, NotUsed> flow) { - - final AtomicReference spanRef = new AtomicReference<>(null); - - return akka.stream.javadsl.Flow.fromFunction( - new Function, Tuple2>() { - @Override - public Tuple2 apply(Tuple2 param) throws Exception { - HttpRequest request = param._1; - T data = param._2; - - Span span = GlobalTracer.get().buildSpan("akka-http.request").start(); - spanRef.set(span); - - AkkaHttpClientDecorator.DECORATE.afterStart(span); - AkkaHttpClientDecorator.DECORATE.onRequest(span, request); - - AkkaHttpClientInstrumentation.AkkaHttpHeaders headers = - new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request); - GlobalTracer.get().inject(span.context(), Format.Builtin.HTTP_HEADERS, headers); - - return new Tuple2<>(headers.getRequest(), data); - } - }) - .via(flow) - .map( - new Function, T>, Tuple2, T>>() { - @Override - public Tuple2, T> apply(Tuple2, T> param) - throws Exception { - Span span = spanRef.get(); - try { - AkkaHttpClientDecorator.DECORATE.onResponse(span, param._1.get()); - } catch (Throwable t) { - AkkaHttpClientDecorator.DECORATE.onError(span, t); - } - AkkaHttpClientDecorator.DECORATE.beforeFinish(span); - span.finish(); - return param; - } - }) - .asScala(); - } -} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientPoolInstrumentationTest.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientPoolInstrumentationTest.groovy deleted file mode 100644 index 243b99a52b..0000000000 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/test/groovy/AkkaHttpClientPoolInstrumentationTest.groovy +++ /dev/null @@ -1,53 +0,0 @@ -import akka.actor.ActorSystem -import akka.http.javadsl.Http -import akka.http.javadsl.model.HttpMethods -import akka.http.javadsl.model.HttpRequest -import akka.http.javadsl.model.HttpResponse -import akka.http.javadsl.model.headers.RawHeader -import akka.japi.Pair -import akka.stream.ActorMaterializer -import akka.stream.javadsl.Sink -import akka.stream.javadsl.Source -import datadog.trace.agent.test.base.HttpClientTest -import datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator -import scala.util.Try -import spock.lang.Shared - -class AkkaHttpClientPoolInstrumentationTest extends HttpClientTest { - - @Shared - ActorSystem system = ActorSystem.create() - @Shared - ActorMaterializer materializer = ActorMaterializer.create(system) - - def pool = Http.get(system).superPool(materializer) - - @Override - int doRequest(String method, URI uri, Map headers, Closure callback) { - def request = HttpRequest.create(uri.toString()) - .withMethod(HttpMethods.lookup(method).get()) - .addHeaders(headers.collect { RawHeader.create(it.key, it.value) }) - - def response = Source - .> single(new Pair(request, 1)) - .via(pool) - .runWith(Sink., Integer>> head(), materializer) - .toCompletableFuture().get().first().get() - callback?.call() - return response.status().intValue() - } - - @Override - AkkaHttpClientDecorator decorator() { - return AkkaHttpClientDecorator.DECORATE - } - - @Override - String expectedOperationName() { - return "akka-http.request" - } - - boolean testRedirects() { - false - } -}