Remove SuperPool instrumentation.
By definition, a Flow generated by a SuperPool does not respect ordering of requests and responses, and in fact will typically only rarely actually behave in the fashion that the instrumentation expects. The previous implementation would start a span for a given request before submitting it as input to the flow, and close the span with whatever response is next emitted by the flow. This request will rarely (if ever) be the actual response for the request that started the span. For more info, see the official docs at https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#configuring-a-host-connection-pool Additionally, compiling this instumentation against scala 2.11, and only scala 2.11 can (and does) cause significant problems at runtime due to the fact that Scala is explicitly not binary compatible across major versions.
This commit is contained in:
parent
bf7bbf31f8
commit
681420a004
|
@ -2,15 +2,12 @@ package datadog.trace.instrumentation.akkahttp;
|
||||||
|
|
||||||
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;
|
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;
|
||||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
|
||||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||||
|
|
||||||
import akka.NotUsed;
|
|
||||||
import akka.http.javadsl.model.headers.RawHeader;
|
import akka.http.javadsl.model.headers.RawHeader;
|
||||||
import akka.http.scaladsl.HttpExt;
|
import akka.http.scaladsl.HttpExt;
|
||||||
import akka.http.scaladsl.model.HttpRequest;
|
import akka.http.scaladsl.model.HttpRequest;
|
||||||
import akka.http.scaladsl.model.HttpResponse;
|
import akka.http.scaladsl.model.HttpResponse;
|
||||||
import akka.stream.scaladsl.Flow;
|
|
||||||
import com.google.auto.service.AutoService;
|
import com.google.auto.service.AutoService;
|
||||||
import datadog.trace.agent.tooling.Instrumenter;
|
import datadog.trace.agent.tooling.Instrumenter;
|
||||||
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
|
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
|
||||||
|
@ -27,7 +24,6 @@ import net.bytebuddy.asm.Advice;
|
||||||
import net.bytebuddy.description.method.MethodDescription;
|
import net.bytebuddy.description.method.MethodDescription;
|
||||||
import net.bytebuddy.description.type.TypeDescription;
|
import net.bytebuddy.description.type.TypeDescription;
|
||||||
import net.bytebuddy.matcher.ElementMatcher;
|
import net.bytebuddy.matcher.ElementMatcher;
|
||||||
import scala.Tuple2;
|
|
||||||
import scala.concurrent.Future;
|
import scala.concurrent.Future;
|
||||||
import scala.runtime.AbstractFunction1;
|
import scala.runtime.AbstractFunction1;
|
||||||
import scala.util.Try;
|
import scala.util.Try;
|
||||||
|
@ -49,7 +45,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
|
||||||
return new String[] {
|
return new String[] {
|
||||||
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
|
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
|
||||||
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders",
|
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders",
|
||||||
packageName + ".AkkaHttpClientTransformFlow",
|
|
||||||
"datadog.trace.agent.decorator.BaseDecorator",
|
"datadog.trace.agent.decorator.BaseDecorator",
|
||||||
"datadog.trace.agent.decorator.ClientDecorator",
|
"datadog.trace.agent.decorator.ClientDecorator",
|
||||||
"datadog.trace.agent.decorator.HttpClientDecorator",
|
"datadog.trace.agent.decorator.HttpClientDecorator",
|
||||||
|
@ -69,14 +64,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
|
||||||
named("singleRequestImpl")
|
named("singleRequestImpl")
|
||||||
.and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
|
.and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
|
||||||
SingleRequestAdvice.class.getName());
|
SingleRequestAdvice.class.getName());
|
||||||
// This is mainly for compatibility with 10.0
|
|
||||||
transformers.put(
|
|
||||||
named("superPool").and(returns(named("akka.stream.scaladsl.Flow"))),
|
|
||||||
SuperPoolAdvice.class.getName());
|
|
||||||
// This is for 10.1+
|
|
||||||
transformers.put(
|
|
||||||
named("superPoolImpl").and(returns(named("akka.stream.scaladsl.Flow"))),
|
|
||||||
SuperPoolAdvice.class.getName());
|
|
||||||
return transformers;
|
return transformers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +92,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
|
||||||
// Request is immutable, so we have to assign new value once we update headers
|
// Request is immutable, so we have to assign new value once we update headers
|
||||||
request = headers.getRequest();
|
request = headers.getRequest();
|
||||||
}
|
}
|
||||||
|
|
||||||
return scope;
|
return scope;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,34 +120,6 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SuperPoolAdvice {
|
|
||||||
|
|
||||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
|
||||||
public static boolean methodEnter() {
|
|
||||||
/*
|
|
||||||
Versions 10.0 and 10.1 have slightly different structure that is hard to distinguish so here
|
|
||||||
we cast 'wider net' and avoid instrumenting twice.
|
|
||||||
In the future we may want to separate these, but since lots of code is reused we would need to come up
|
|
||||||
with way of continuing to reusing it.
|
|
||||||
*/
|
|
||||||
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(HttpExt.class);
|
|
||||||
return callDepth <= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
|
||||||
public static <T> void methodExit(
|
|
||||||
@Advice.Return(readOnly = false)
|
|
||||||
Flow<Tuple2<HttpRequest, T>, Tuple2<Try<HttpResponse>, T>, NotUsed> flow,
|
|
||||||
@Advice.Enter final boolean isApplied) {
|
|
||||||
if (!isApplied) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
CallDepthThreadLocalMap.reset(HttpExt.class);
|
|
||||||
|
|
||||||
flow = AkkaHttpClientTransformFlow.transform(flow);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
|
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
|
||||||
private final Span span;
|
private final Span span;
|
||||||
|
|
|
@ -1,61 +0,0 @@
|
||||||
package datadog.trace.instrumentation.akkahttp;
|
|
||||||
|
|
||||||
import akka.NotUsed;
|
|
||||||
import akka.http.scaladsl.model.HttpRequest;
|
|
||||||
import akka.http.scaladsl.model.HttpResponse;
|
|
||||||
import akka.japi.function.Function;
|
|
||||||
import akka.stream.scaladsl.Flow;
|
|
||||||
import io.opentracing.Span;
|
|
||||||
import io.opentracing.propagation.Format;
|
|
||||||
import io.opentracing.util.GlobalTracer;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import scala.Tuple2;
|
|
||||||
import scala.util.Try;
|
|
||||||
|
|
||||||
public class AkkaHttpClientTransformFlow {
|
|
||||||
|
|
||||||
public static <T> Flow<Tuple2<HttpRequest, T>, Tuple2<Try<HttpResponse>, T>, NotUsed> transform(
|
|
||||||
Flow<Tuple2<HttpRequest, T>, Tuple2<Try<HttpResponse>, T>, NotUsed> flow) {
|
|
||||||
|
|
||||||
final AtomicReference<Span> spanRef = new AtomicReference<>(null);
|
|
||||||
|
|
||||||
return akka.stream.javadsl.Flow.fromFunction(
|
|
||||||
new Function<Tuple2<HttpRequest, T>, Tuple2<HttpRequest, T>>() {
|
|
||||||
@Override
|
|
||||||
public Tuple2<HttpRequest, T> apply(Tuple2<HttpRequest, T> param) throws Exception {
|
|
||||||
HttpRequest request = param._1;
|
|
||||||
T data = param._2;
|
|
||||||
|
|
||||||
Span span = GlobalTracer.get().buildSpan("akka-http.request").start();
|
|
||||||
spanRef.set(span);
|
|
||||||
|
|
||||||
AkkaHttpClientDecorator.DECORATE.afterStart(span);
|
|
||||||
AkkaHttpClientDecorator.DECORATE.onRequest(span, request);
|
|
||||||
|
|
||||||
AkkaHttpClientInstrumentation.AkkaHttpHeaders headers =
|
|
||||||
new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request);
|
|
||||||
GlobalTracer.get().inject(span.context(), Format.Builtin.HTTP_HEADERS, headers);
|
|
||||||
|
|
||||||
return new Tuple2<>(headers.getRequest(), data);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.via(flow)
|
|
||||||
.map(
|
|
||||||
new Function<Tuple2<Try<HttpResponse>, T>, Tuple2<Try<HttpResponse>, T>>() {
|
|
||||||
@Override
|
|
||||||
public Tuple2<Try<HttpResponse>, T> apply(Tuple2<Try<HttpResponse>, T> param)
|
|
||||||
throws Exception {
|
|
||||||
Span span = spanRef.get();
|
|
||||||
try {
|
|
||||||
AkkaHttpClientDecorator.DECORATE.onResponse(span, param._1.get());
|
|
||||||
} catch (Throwable t) {
|
|
||||||
AkkaHttpClientDecorator.DECORATE.onError(span, t);
|
|
||||||
}
|
|
||||||
AkkaHttpClientDecorator.DECORATE.beforeFinish(span);
|
|
||||||
span.finish();
|
|
||||||
return param;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.asScala();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,53 +0,0 @@
|
||||||
import akka.actor.ActorSystem
|
|
||||||
import akka.http.javadsl.Http
|
|
||||||
import akka.http.javadsl.model.HttpMethods
|
|
||||||
import akka.http.javadsl.model.HttpRequest
|
|
||||||
import akka.http.javadsl.model.HttpResponse
|
|
||||||
import akka.http.javadsl.model.headers.RawHeader
|
|
||||||
import akka.japi.Pair
|
|
||||||
import akka.stream.ActorMaterializer
|
|
||||||
import akka.stream.javadsl.Sink
|
|
||||||
import akka.stream.javadsl.Source
|
|
||||||
import datadog.trace.agent.test.base.HttpClientTest
|
|
||||||
import datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator
|
|
||||||
import scala.util.Try
|
|
||||||
import spock.lang.Shared
|
|
||||||
|
|
||||||
class AkkaHttpClientPoolInstrumentationTest extends HttpClientTest<AkkaHttpClientDecorator> {
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
ActorSystem system = ActorSystem.create()
|
|
||||||
@Shared
|
|
||||||
ActorMaterializer materializer = ActorMaterializer.create(system)
|
|
||||||
|
|
||||||
def pool = Http.get(system).superPool(materializer)
|
|
||||||
|
|
||||||
@Override
|
|
||||||
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
|
|
||||||
def request = HttpRequest.create(uri.toString())
|
|
||||||
.withMethod(HttpMethods.lookup(method).get())
|
|
||||||
.addHeaders(headers.collect { RawHeader.create(it.key, it.value) })
|
|
||||||
|
|
||||||
def response = Source
|
|
||||||
.<Pair<HttpRequest, Integer>> single(new Pair(request, 1))
|
|
||||||
.via(pool)
|
|
||||||
.runWith(Sink.<Pair<Try<HttpResponse>, Integer>> head(), materializer)
|
|
||||||
.toCompletableFuture().get().first().get()
|
|
||||||
callback?.call()
|
|
||||||
return response.status().intValue()
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
AkkaHttpClientDecorator decorator() {
|
|
||||||
return AkkaHttpClientDecorator.DECORATE
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
String expectedOperationName() {
|
|
||||||
return "akka-http.request"
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean testRedirects() {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue