Add akka-http-client instrumentation: superPool

This commit is contained in:
Nikolay Martynov 2018-06-29 16:20:32 -04:00
parent 2b25de966a
commit ae37ca4b02
4 changed files with 182 additions and 10 deletions

View File

@ -1,6 +1,9 @@
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle"
// We have actual Scala sources here
apply plugin: 'scala'
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
lagomTest

View File

@ -3,11 +3,12 @@ package datadog.trace.instrumentation.akkahttp;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.*;
import akka.NotUsed;
import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.*;
import akka.stream.scaladsl.Flow;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.*;
import datadog.trace.api.DDSpanTypes;
@ -24,6 +25,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
import scala.Tuple2;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
import scala.util.Try;
@ -43,7 +45,8 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurab
private static final HelperInjector HELPER_INJECTOR =
new HelperInjector(
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders");
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders",
AkkaHttpClientTransformFlow.class.getName());
@Override
public AgentBuilder apply(final AgentBuilder agentBuilder) {
@ -56,11 +59,16 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurab
.advice(
named("singleRequest")
.and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
AkkaHttpClientAdvice.class.getName()))
SingleRequesrAdvice.class.getName()))
.transform(
DDAdvice.create()
.advice(
named("superPool").and(returns(named("akka.stream.scaladsl.Flow"))),
SuperPoolAdvice.class.getName()))
.asDecorator();
}
public static class AkkaHttpClientAdvice {
public static class SingleRequesrAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
@Advice.Argument(value = 0, readOnly = false) HttpRequest request) {
@ -93,6 +101,15 @@ public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurab
}
}
public static class SuperPoolAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <T> void methodExit(
@Advice.Return(readOnly = false)
Flow<Tuple2<HttpRequest, T>, Tuple2<Try<HttpResponse>, T>, NotUsed> flow) {
flow = AkkaHttpClientTransformFlow.transform(flow);
}
}
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
private final Scope scope;

View File

@ -0,0 +1,48 @@
package datadog.trace.instrumentation.akkahttp
import java.util.Collections
import akka.NotUsed
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.Supervision
import akka.stream.scaladsl.Flow
import datadog.trace.api.{DDSpanTypes, DDTags}
import io.opentracing.log.Fields.ERROR_OBJECT
import io.opentracing.{Scope, Span}
import io.opentracing.propagation.Format
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import scala.util.{Failure, Success, Try}
object AkkaHttpClientTransformFlow {
def transform[T](flow: Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = {
var span: Span = null
Flow.fromFunction((input: (HttpRequest, T)) => {
val (request, data) = input
val scope = GlobalTracer.get
.buildSpan("akka-http.request")
.withTag(Tags.SPAN_KIND.getKey, Tags.SPAN_KIND_CLIENT)
.withTag(Tags.HTTP_METHOD.getKey, request.method.value)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
.withTag(Tags.COMPONENT.getKey, "akka-http-client")
.withTag(Tags.HTTP_URL.getKey, request.getUri.toString)
.startActive(false)
val headers = new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request)
GlobalTracer.get.inject(scope.span.context, Format.Builtin.HTTP_HEADERS, headers)
span = scope.span
scope.close()
(headers.getRequest, data)
}).via(flow).map(output => {
output._1 match {
case Success(response) => Tags.HTTP_STATUS.set(span, response.status.intValue)
case Failure(e) =>
Tags.ERROR.set(span, true)
span.log(Collections.singletonMap(ERROR_OBJECT, e))
}
span.finish()
output
})
}
}

View File

@ -2,13 +2,17 @@ import akka.actor.ActorSystem
import akka.http.javadsl.Http
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.StreamTcpException
import akka.stream.javadsl.Sink
import akka.stream.javadsl.Source
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.RatpackUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import scala.util.Try
import spock.lang.Shared
import java.util.concurrent.CompletionStage
@ -51,6 +55,8 @@ class AkkaHttpClientInstrumentationTest extends AgentTestRunner {
@Shared
ActorMaterializer materializer = ActorMaterializer.create(system)
def pool = Http.get(system).<Integer>superPool(materializer)
def "#route request trace" () {
setup:
def url = server.address.resolve("/" + route).toURL()
@ -59,10 +65,12 @@ class AkkaHttpClientInstrumentationTest extends AgentTestRunner {
CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(request, materializer)
when:
HttpResponse response = responseFuture.toCompletableFuture().get()
String message = readMessage(response)
expect:
then:
response.status().intValue() == expectedStatus
if (expectedMessage != null) {
message == expectedMessage
@ -116,13 +124,109 @@ class AkkaHttpClientInstrumentationTest extends AgentTestRunner {
CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(request, materializer)
try {
responseFuture.toCompletableFuture().get()
} catch (ExecutionException e) {
// This is expected to fail
when:
responseFuture.toCompletableFuture().get()
then:
thrown ExecutionException
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /test"
errored true
tags {
defaultTags()
"$Tags.HTTP_URL.key" url.toString()
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
"$Tags.ERROR.key" true
errorTags(StreamTcpException, { it.contains("Tcp command") })
}
}
}
}
}
def "#route pool request trace" () {
setup:
def url = server.address.resolve("/" + route).toURL()
CompletionStage<Pair<Try<HttpResponse>, Integer>> sink = Source
.<Pair<HttpRequest, Integer>>single(new Pair(HttpRequest.create(url.toString()), 1))
.via(pool)
.runWith(Sink.<Pair<Try<HttpResponse>, Integer>>head(), materializer)
when:
HttpResponse response = sink.toCompletableFuture().get().first().get()
String message = readMessage(response)
then:
response.status().intValue() == expectedStatus
if (expectedMessage != null) {
message == expectedMessage
}
expect:
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
operationName "test-http-server"
childOf(TEST_WRITER[1][0])
errored false
tags {
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /$route"
errored expectedError
tags {
defaultTags()
"$Tags.HTTP_STATUS.key" expectedStatus
"$Tags.HTTP_URL.key" "${server.address}$route"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
if (expectedError) {
"$Tags.ERROR.key" true
}
}
}
}
}
where:
route | expectedStatus | expectedError | expectedMessage
"success" | 200 | false | MESSAGE
"error" | 500 | true | null
}
def "error request pool trace" () {
setup:
def url = new URL("http://localhost:${server.address.port + 1}/test")
CompletionStage<Pair<Try<HttpResponse>, Integer>> sink = Source
.<Pair<HttpRequest, Integer>>single(new Pair(HttpRequest.create(url.toString()), 1))
.via(pool)
.runWith(Sink.<Pair<Try<HttpResponse>, Integer>>head(), materializer)
def response = sink.toCompletableFuture().get().first()
when:
response.get()
then:
thrown StreamTcpException
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {