Add akka-http-client instrumentation

This is a very-very first pass: instrument single request
This commit is contained in:
Nikolay Martynov 2018-06-26 16:10:52 -04:00
parent 6932d581ed
commit 2b25de966a
3 changed files with 296 additions and 0 deletions

View File

@ -41,3 +41,6 @@ dependencies {
test.dependsOn lagomTest
testJava8Minimum += '*Test*.class'
// These classes use Ratpack which requires Java 8. (Currently also incompatible with Java 9.)
testJava8Only += '**/AkkaHttpClientInstrumentationTest.class'

View File

@ -0,0 +1,140 @@
package datadog.trace.instrumentation.akkahttp;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.*;
import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.*;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.*;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
import scala.util.Try;
@Slf4j
@AutoService(Instrumenter.class)
public final class AkkaHttpClientInstrumentation extends Instrumenter.Configurable {
public AkkaHttpClientInstrumentation() {
super("akka-http", "akka-http-client");
}
@Override
protected boolean defaultEnabled() {
return false;
}
private static final HelperInjector HELPER_INJECTOR =
new HelperInjector(
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders");
@Override
public AgentBuilder apply(final AgentBuilder agentBuilder) {
return agentBuilder
.type(named("akka.http.scaladsl.HttpExt"))
.transform(DDTransformers.defaultTransformers())
.transform(HELPER_INJECTOR)
.transform(
DDAdvice.create()
.advice(
named("singleRequest")
.and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
AkkaHttpClientAdvice.class.getName()))
.asDecorator();
}
public static class AkkaHttpClientAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
@Advice.Argument(value = 0, readOnly = false) HttpRequest request) {
Scope scope =
GlobalTracer.get()
.buildSpan("akka-http.request")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.HTTP_METHOD.getKey(), request.method().value())
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
.withTag(Tags.COMPONENT.getKey(), "akka-http-client")
.withTag(Tags.HTTP_URL.getKey(), request.getUri().toString())
.startActive(false);
AkkaHttpHeaders headers = new AkkaHttpHeaders(request);
GlobalTracer.get().inject(scope.span().context(), Format.Builtin.HTTP_HEADERS, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
return scope;
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(value = 0) final HttpRequest request,
@Advice.This final HttpExt thiz,
@Advice.Return final Future<HttpResponse> responseFuture,
@Advice.Enter final Scope scope) {
responseFuture.onComplete(new OnCompleteHandler(scope), thiz.system().dispatcher());
scope.close();
}
}
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
private final Scope scope;
public OnCompleteHandler(Scope scope) {
this.scope = scope;
}
@Override
public Void apply(Try<HttpResponse> result) {
Span span = scope.span();
if (result.isSuccess()) {
Tags.HTTP_STATUS.set(span, result.get().status().intValue());
} else {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, result.failed().get()));
}
span.finish();
return null;
}
}
public static class AkkaHttpHeaders implements TextMap {
private HttpRequest request;
public AkkaHttpHeaders(HttpRequest request) {
this.request = request;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"This class should be used only with Tracer.inject()!");
}
@Override
public void put(final String name, final String value) {
// It looks like this cast is only needed in Java, Scala would have figured it out
request = (HttpRequest) request.addHeader(RawHeader.create(name, value));
}
public HttpRequest getRequest() {
return request;
}
}
}

View File

@ -0,0 +1,153 @@
import akka.actor.ActorSystem
import akka.http.javadsl.Http
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.stream.ActorMaterializer
import akka.stream.StreamTcpException
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.RatpackUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import spock.lang.Shared
import java.util.concurrent.CompletionStage
import java.util.concurrent.ExecutionException
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
import static ratpack.groovy.test.embed.GroovyEmbeddedApp.ratpack
class AkkaHttpClientInstrumentationTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.akka-http-client.enabled", "true")
}
private static final String MESSAGE = "an\nmultiline\nhttp\nresponse"
private static final long TIMEOUT = 10000L
@Shared
def server = ratpack {
handlers {
prefix("success") {
all {
RatpackUtils.handleDistributedRequest(context)
response.status(200).send(MESSAGE)
}
}
prefix("error") {
all {
RatpackUtils.handleDistributedRequest(context)
throw new RuntimeException("error")
}
}
}
}
@Shared
ActorSystem system = ActorSystem.create()
@Shared
ActorMaterializer materializer = ActorMaterializer.create(system)
def "#route request trace" () {
setup:
def url = server.address.resolve("/" + route).toURL()
HttpRequest request = HttpRequest.create(url.toString())
CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(request, materializer)
HttpResponse response = responseFuture.toCompletableFuture().get()
String message = readMessage(response)
expect:
response.status().intValue() == expectedStatus
if (expectedMessage != null) {
message == expectedMessage
}
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
operationName "test-http-server"
childOf(TEST_WRITER[1][0])
errored false
tags {
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /$route"
errored expectedError
tags {
defaultTags()
"$Tags.HTTP_STATUS.key" expectedStatus
"$Tags.HTTP_URL.key" "${server.address}$route"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
if (expectedError) {
"$Tags.ERROR.key" true
}
}
}
}
}
where:
route | expectedStatus | expectedError | expectedMessage
"success" | 200 | false | MESSAGE
"error" | 500 | true | null
}
def "error request trace" () {
setup:
def url = new URL("http://localhost:${server.address.port + 1}/test")
HttpRequest request = HttpRequest.create(url.toString())
CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(request, materializer)
try {
responseFuture.toCompletableFuture().get()
} catch (ExecutionException e) {
// This is expected to fail
}
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /test"
errored true
tags {
defaultTags()
"$Tags.HTTP_URL.key" url.toString()
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
"$Tags.ERROR.key" true
errorTags(StreamTcpException, { it.contains("Tcp command") })
}
}
}
}
}
String readMessage(HttpResponse response) {
response.entity().toStrict(TIMEOUT, materializer).toCompletableFuture().get().getData().utf8String()
}
}