Merge pull request #899 from dpratt/fix-akka-http

Fix akka-http instrumentation.
This commit is contained in:
Tyler Benson 2019-07-10 14:39:47 -06:00 committed by GitHub
commit 4501dbe920
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 8 additions and 133 deletions

View File

@ -59,6 +59,14 @@ muzzle {
// later versions of akka-http expect streams to be provided
extraDependency 'com.typesafe.akka:akka-stream_2.12:2.5.11'
}
//There is no akka-http 10.0.x series for scala 2.13
pass {
group = 'com.typesafe.akka'
module = 'akka-http_2.13'
versions = "[10.1.8,)"
// later versions of akka-http expect streams to be provided
extraDependency 'com.typesafe.akka:akka-stream_2.13:2.5.23'
}
}
dependencies {

View File

@ -2,15 +2,12 @@ package datadog.trace.instrumentation.akkahttp;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.NotUsed;
import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Flow;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
@ -27,7 +24,6 @@ import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Tuple2;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
import scala.util.Try;
@ -49,10 +45,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
return new String[] {
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders",
packageName + ".AkkaHttpClientTransformFlow",
packageName + ".AkkaHttpClientTransformFlow$",
packageName + ".AkkaHttpClientTransformFlow$$anonfun$transform$1",
packageName + ".AkkaHttpClientTransformFlow$$anonfun$transform$2",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.HttpClientDecorator",
@ -72,14 +64,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
named("singleRequestImpl")
.and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
SingleRequestAdvice.class.getName());
// This is mainly for compatibility with 10.0
transformers.put(
named("superPool").and(returns(named("akka.stream.scaladsl.Flow"))),
SuperPoolAdvice.class.getName());
// This is for 10.1+
transformers.put(
named("superPoolImpl").and(returns(named("akka.stream.scaladsl.Flow"))),
SuperPoolAdvice.class.getName());
return transformers;
}
@ -108,7 +92,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
}
return scope;
}
@ -137,34 +120,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
}
}
public static class SuperPoolAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean methodEnter() {
/*
Versions 10.0 and 10.1 have slightly different structure that is hard to distinguish so here
we cast 'wider net' and avoid instrumenting twice.
In the future we may want to separate these, but since lots of code is reused we would need to come up
with way of continuing to reusing it.
*/
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(HttpExt.class);
return callDepth <= 0;
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static <T> void methodExit(
@Advice.Return(readOnly = false)
Flow<Tuple2<HttpRequest, T>, Tuple2<Try<HttpResponse>, T>, NotUsed> flow,
@Advice.Enter final boolean isApplied) {
if (!isApplied) {
return;
}
CallDepthThreadLocalMap.reset(HttpExt.class);
flow = AkkaHttpClientTransformFlow.transform(flow);
}
}
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
private final Span span;

View File

@ -1,35 +0,0 @@
package datadog.trace.instrumentation.akkahttp
import akka.NotUsed
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.Flow
import datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE
import io.opentracing.Span
import io.opentracing.propagation.Format
import io.opentracing.util.GlobalTracer
import scala.util.{Failure, Success, Try}
object AkkaHttpClientTransformFlow {
def transform[T](flow: Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = {
var span: Span = null
Flow.fromFunction((input: (HttpRequest, T)) => {
val (request, data) = input
span = GlobalTracer.get.buildSpan("akka-http.request").start()
DECORATE.afterStart(span)
DECORATE.onRequest(span, request)
val headers = new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request)
GlobalTracer.get.inject(span.context, Format.Builtin.HTTP_HEADERS, headers)
(headers.getRequest, data)
}).via(flow).map(output => {
output._1 match {
case Success(response) => DECORATE.onResponse(span, response)
case Failure(e) => DECORATE.onError(span, e)
}
DECORATE.beforeFinish(span)
span.finish()
output
})
}
}

View File

@ -1,53 +0,0 @@
import akka.actor.ActorSystem
import akka.http.javadsl.Http
import akka.http.javadsl.model.HttpMethods
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.http.javadsl.model.headers.RawHeader
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Sink
import akka.stream.javadsl.Source
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator
import scala.util.Try
import spock.lang.Shared
class AkkaHttpClientPoolInstrumentationTest extends HttpClientTest<AkkaHttpClientDecorator> {
@Shared
ActorSystem system = ActorSystem.create()
@Shared
ActorMaterializer materializer = ActorMaterializer.create(system)
def pool = Http.get(system).superPool(materializer)
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def request = HttpRequest.create(uri.toString())
.withMethod(HttpMethods.lookup(method).get())
.addHeaders(headers.collect { RawHeader.create(it.key, it.value) })
def response = Source
.<Pair<HttpRequest, Integer>> single(new Pair(request, 1))
.via(pool)
.runWith(Sink.<Pair<Try<HttpResponse>, Integer>> head(), materializer)
.toCompletableFuture().get().first().get()
callback?.call()
return response.status().intValue()
}
@Override
AkkaHttpClientDecorator decorator() {
return AkkaHttpClientDecorator.DECORATE
}
@Override
String expectedOperationName() {
return "akka-http.request"
}
boolean testRedirects() {
false
}
}