Merge pull request #376 from DataDog/mar-kolya/akka-http-server-renames
akka http server renames
This commit is contained in:
commit
2447f6a99d
|
@ -1,9 +1,11 @@
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.stream.javadsl.Source
|
import akka.stream.javadsl.Source
|
||||||
import akka.stream.testkit.javadsl.TestSink
|
import akka.stream.testkit.javadsl.TestSink
|
||||||
|
import datadog.trace.api.DDSpanTypes
|
||||||
|
import datadog.trace.api.DDTags
|
||||||
|
import io.opentracing.tag.Tags
|
||||||
import net.bytebuddy.utility.JavaModule
|
import net.bytebuddy.utility.JavaModule
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS
|
|
||||||
import datadog.trace.agent.test.AgentTestRunner
|
import datadog.trace.agent.test.AgentTestRunner
|
||||||
import play.inject.guice.GuiceApplicationBuilder
|
import play.inject.guice.GuiceApplicationBuilder
|
||||||
import spock.lang.Shared
|
import spock.lang.Shared
|
||||||
|
@ -64,7 +66,7 @@ class LagomTest extends AgentTestRunner {
|
||||||
Source.from(Arrays.asList("msg1", "msg2", "msg3"))
|
Source.from(Arrays.asList("msg1", "msg2", "msg3"))
|
||||||
.concat(Source.maybe())
|
.concat(Source.maybe())
|
||||||
Source<String, NotUsed> output = service.echo().invoke(input)
|
Source<String, NotUsed> output = service.echo().invoke(input)
|
||||||
.toCompletableFuture().get(5, SECONDS)
|
.toCompletableFuture().get()
|
||||||
Probe<String> probe = output.runWith(TestSink.probe(server.system()), server.materializer())
|
Probe<String> probe = output.runWith(TestSink.probe(server.system()), server.materializer())
|
||||||
probe.request(10)
|
probe.request(10)
|
||||||
probe.expectNext("msg1")
|
probe.expectNext("msg1")
|
||||||
|
@ -82,12 +84,12 @@ class LagomTest extends AgentTestRunner {
|
||||||
errored false
|
errored false
|
||||||
tags {
|
tags {
|
||||||
defaultTags()
|
defaultTags()
|
||||||
"http.status_code" 101
|
"$Tags.HTTP_STATUS.key" 101
|
||||||
"http.url" "ws://localhost:${server.port()}/echo"
|
"$Tags.HTTP_URL.key" "ws://localhost:${server.port()}/echo"
|
||||||
"http.method" "GET"
|
"$Tags.HTTP_METHOD.key" "GET"
|
||||||
"span.kind" "server"
|
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
|
||||||
"span.type" "web"
|
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
|
||||||
"component" "akka-http-server"
|
"$Tags.COMPONENT.key" "akka-http-server"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
span(1) {
|
span(1) {
|
||||||
|
@ -106,7 +108,7 @@ class LagomTest extends AgentTestRunner {
|
||||||
Source.from(Arrays.asList("msg1", "msg2", "msg3"))
|
Source.from(Arrays.asList("msg1", "msg2", "msg3"))
|
||||||
.concat(Source.maybe())
|
.concat(Source.maybe())
|
||||||
try {
|
try {
|
||||||
service.error().invoke(input).toCompletableFuture().get(5, SECONDS)
|
service.error().invoke(input).toCompletableFuture().get()
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,13 +122,13 @@ class LagomTest extends AgentTestRunner {
|
||||||
errored true
|
errored true
|
||||||
tags {
|
tags {
|
||||||
defaultTags()
|
defaultTags()
|
||||||
"http.status_code" 500
|
"$Tags.HTTP_STATUS.key" 500
|
||||||
"http.url" "ws://localhost:${server.port()}/error"
|
"$Tags.HTTP_URL.key" "ws://localhost:${server.port()}/error"
|
||||||
"http.method" "GET"
|
"$Tags.HTTP_METHOD.key" "GET"
|
||||||
"span.kind" "server"
|
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
|
||||||
"span.type" "web"
|
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
|
||||||
"component" "akka-http-server"
|
"$Tags.COMPONENT.key" "akka-http-server"
|
||||||
"error" true
|
"$Tags.ERROR.key" true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import akka.http.javadsl.model.HttpHeader;
|
||||||
import akka.http.scaladsl.model.HttpRequest;
|
import akka.http.scaladsl.model.HttpRequest;
|
||||||
import akka.http.scaladsl.model.HttpResponse;
|
import akka.http.scaladsl.model.HttpResponse;
|
||||||
import akka.stream.*;
|
import akka.stream.*;
|
||||||
import akka.stream.stage.*;
|
|
||||||
import com.google.auto.service.AutoService;
|
import com.google.auto.service.AutoService;
|
||||||
import datadog.trace.agent.tooling.*;
|
import datadog.trace.agent.tooling.*;
|
||||||
import datadog.trace.api.DDSpanTypes;
|
import datadog.trace.api.DDSpanTypes;
|
||||||
|
@ -33,8 +32,8 @@ import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@AutoService(Instrumenter.class)
|
@AutoService(Instrumenter.class)
|
||||||
public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
public final class AkkaHttpServerInstrumentation extends Instrumenter.Configurable {
|
||||||
public AkkaHttpInstrumentation() {
|
public AkkaHttpServerInstrumentation() {
|
||||||
super("akka-http", "akka-http-server");
|
super("akka-http", "akka-http-server");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,11 +44,12 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
|
|
||||||
private static final HelperInjector akkaHttpHelperInjector =
|
private static final HelperInjector akkaHttpHelperInjector =
|
||||||
new HelperInjector(
|
new HelperInjector(
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogSyncWrapper",
|
AkkaHttpServerInstrumentation.class.getName() + "$DatadogWrapperHelper",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper",
|
AkkaHttpServerInstrumentation.class.getName() + "$DatadogSyncWrapper",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper$1",
|
AkkaHttpServerInstrumentation.class.getName() + "$DatadogAsyncWrapper",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper$2",
|
AkkaHttpServerInstrumentation.class.getName() + "$DatadogAsyncWrapper$1",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$AkkaHttpHeaders");
|
AkkaHttpServerInstrumentation.class.getName() + "$DatadogAsyncWrapper$2",
|
||||||
|
AkkaHttpServerInstrumentation.class.getName() + "$AkkaHttpServerHeaders");
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
||||||
|
@ -57,9 +57,9 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
.type(named("akka.http.scaladsl.HttpExt"))
|
.type(named("akka.http.scaladsl.HttpExt"))
|
||||||
.transform(DDTransformers.defaultTransformers())
|
.transform(DDTransformers.defaultTransformers())
|
||||||
.transform(akkaHttpHelperInjector)
|
.transform(akkaHttpHelperInjector)
|
||||||
// Insturmenting akka-streams bindAndHandle api was previously attempted.
|
// Instrumenting akka-streams bindAndHandle api was previously attempted.
|
||||||
// This proved difficult as there was no clean way to close the async scope
|
// This proved difficult as there was no clean way to close the async scope
|
||||||
// in the graph logic after the user's requst handler completes.
|
// in the graph logic after the user's request handler completes.
|
||||||
//
|
//
|
||||||
// Instead, we're instrumenting the bindAndHandle function helpers by
|
// Instead, we're instrumenting the bindAndHandle function helpers by
|
||||||
// wrapping the scala functions with our own handlers.
|
// wrapping the scala functions with our own handlers.
|
||||||
|
@ -95,31 +95,11 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DatadogSyncWrapper extends AbstractFunction1<HttpRequest, HttpResponse> {
|
public static class DatadogWrapperHelper {
|
||||||
private final Function1<HttpRequest, HttpResponse> userHandler;
|
|
||||||
|
|
||||||
public DatadogSyncWrapper(Function1<HttpRequest, HttpResponse> userHandler) {
|
|
||||||
this.userHandler = userHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HttpResponse apply(HttpRequest request) {
|
|
||||||
final Scope scope = DatadogSyncWrapper.createSpan(request);
|
|
||||||
try {
|
|
||||||
final HttpResponse response = userHandler.apply(request);
|
|
||||||
scope.close();
|
|
||||||
finishSpan(scope.span(), response);
|
|
||||||
return response;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
scope.close();
|
|
||||||
finishSpan(scope.span(), t);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Scope createSpan(HttpRequest request) {
|
public static Scope createSpan(HttpRequest request) {
|
||||||
final SpanContext extractedContext =
|
final SpanContext extractedContext =
|
||||||
GlobalTracer.get().extract(Format.Builtin.HTTP_HEADERS, new AkkaHttpHeaders(request));
|
GlobalTracer.get()
|
||||||
|
.extract(Format.Builtin.HTTP_HEADERS, new AkkaHttpServerHeaders(request));
|
||||||
final Scope scope =
|
final Scope scope =
|
||||||
GlobalTracer.get()
|
GlobalTracer.get()
|
||||||
.buildSpan("akka-http.request")
|
.buildSpan("akka-http.request")
|
||||||
|
@ -158,6 +138,29 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class DatadogSyncWrapper extends AbstractFunction1<HttpRequest, HttpResponse> {
|
||||||
|
private final Function1<HttpRequest, HttpResponse> userHandler;
|
||||||
|
|
||||||
|
public DatadogSyncWrapper(Function1<HttpRequest, HttpResponse> userHandler) {
|
||||||
|
this.userHandler = userHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpResponse apply(HttpRequest request) {
|
||||||
|
final Scope scope = DatadogWrapperHelper.createSpan(request);
|
||||||
|
try {
|
||||||
|
final HttpResponse response = userHandler.apply(request);
|
||||||
|
scope.close();
|
||||||
|
DatadogWrapperHelper.finishSpan(scope.span(), response);
|
||||||
|
return response;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
scope.close();
|
||||||
|
DatadogWrapperHelper.finishSpan(scope.span(), t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class DatadogAsyncWrapper
|
public static class DatadogAsyncWrapper
|
||||||
extends AbstractFunction1<HttpRequest, Future<HttpResponse>> {
|
extends AbstractFunction1<HttpRequest, Future<HttpResponse>> {
|
||||||
private final Function1<HttpRequest, Future<HttpResponse>> userHandler;
|
private final Function1<HttpRequest, Future<HttpResponse>> userHandler;
|
||||||
|
@ -172,13 +175,13 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Future<HttpResponse> apply(HttpRequest request) {
|
public Future<HttpResponse> apply(HttpRequest request) {
|
||||||
final Scope scope = DatadogSyncWrapper.createSpan(request);
|
final Scope scope = DatadogWrapperHelper.createSpan(request);
|
||||||
Future<HttpResponse> futureResponse = null;
|
Future<HttpResponse> futureResponse = null;
|
||||||
try {
|
try {
|
||||||
futureResponse = userHandler.apply(request);
|
futureResponse = userHandler.apply(request);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
scope.close();
|
scope.close();
|
||||||
DatadogSyncWrapper.finishSpan(scope.span(), t);
|
DatadogWrapperHelper.finishSpan(scope.span(), t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
final Future<HttpResponse> wrapped =
|
final Future<HttpResponse> wrapped =
|
||||||
|
@ -186,14 +189,14 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
new AbstractFunction1<HttpResponse, HttpResponse>() {
|
new AbstractFunction1<HttpResponse, HttpResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public HttpResponse apply(HttpResponse response) {
|
public HttpResponse apply(HttpResponse response) {
|
||||||
DatadogSyncWrapper.finishSpan(scope.span(), response);
|
DatadogWrapperHelper.finishSpan(scope.span(), response);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new AbstractFunction1<Throwable, Throwable>() {
|
new AbstractFunction1<Throwable, Throwable>() {
|
||||||
@Override
|
@Override
|
||||||
public Throwable apply(Throwable t) {
|
public Throwable apply(Throwable t) {
|
||||||
DatadogSyncWrapper.finishSpan(scope.span(), t);
|
DatadogWrapperHelper.finishSpan(scope.span(), t);
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -203,10 +206,10 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class AkkaHttpHeaders implements TextMap {
|
public static class AkkaHttpServerHeaders implements TextMap {
|
||||||
private final HttpRequest request;
|
private final HttpRequest request;
|
||||||
|
|
||||||
public AkkaHttpHeaders(HttpRequest request) {
|
public AkkaHttpServerHeaders(HttpRequest request) {
|
||||||
this.request = request;
|
this.request = request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,8 +225,8 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(String s, String s1) {
|
public void put(String name, String value) {
|
||||||
throw new IllegalStateException("akka http headers can only be extracted");
|
throw new IllegalStateException("akka http server headers can only be extracted");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,11 +1,14 @@
|
||||||
import datadog.trace.agent.test.AgentTestRunner
|
import datadog.trace.agent.test.AgentTestRunner
|
||||||
|
import datadog.trace.api.DDSpanTypes
|
||||||
|
import datadog.trace.api.DDTags
|
||||||
|
import io.opentracing.tag.Tags
|
||||||
import okhttp3.OkHttpClient
|
import okhttp3.OkHttpClient
|
||||||
import okhttp3.Request
|
import okhttp3.Request
|
||||||
import spock.lang.Shared
|
import spock.lang.Shared
|
||||||
|
|
||||||
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
|
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
|
||||||
|
|
||||||
class AkkaHttpInstrumentationTest extends AgentTestRunner {
|
class AkkaHttpServerInstrumentationTest extends AgentTestRunner {
|
||||||
static {
|
static {
|
||||||
System.setProperty("dd.integration.akka-http-server.enabled", "true")
|
System.setProperty("dd.integration.akka-http-server.enabled", "true")
|
||||||
}
|
}
|
||||||
|
@ -52,12 +55,12 @@ class AkkaHttpInstrumentationTest extends AgentTestRunner {
|
||||||
errored false
|
errored false
|
||||||
tags {
|
tags {
|
||||||
defaultTags()
|
defaultTags()
|
||||||
"http.status_code" 200
|
"$Tags.HTTP_STATUS.key" 200
|
||||||
"http.url" "http://localhost:$port/test"
|
"$Tags.HTTP_URL.key" "http://localhost:$port/test"
|
||||||
"http.method" "GET"
|
"$Tags.HTTP_METHOD.key" "GET"
|
||||||
"span.kind" "server"
|
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
|
||||||
"span.type" "web"
|
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
|
||||||
"component" "akka-http-server"
|
"$Tags.COMPONENT.key" "akka-http-server"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
span(1) {
|
span(1) {
|
||||||
|
@ -94,16 +97,13 @@ class AkkaHttpInstrumentationTest extends AgentTestRunner {
|
||||||
errored true
|
errored true
|
||||||
tags {
|
tags {
|
||||||
defaultTags()
|
defaultTags()
|
||||||
"http.status_code" 500
|
"$Tags.HTTP_STATUS.key" 500
|
||||||
"http.url" "http://localhost:$port/$endpoint"
|
"$Tags.HTTP_URL.key" "http://localhost:$port/$endpoint"
|
||||||
"http.method" "GET"
|
"$Tags.HTTP_METHOD.key" "GET"
|
||||||
"span.kind" "server"
|
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
|
||||||
"span.type" "web"
|
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
|
||||||
"component" "akka-http-server"
|
"$Tags.COMPONENT.key" "akka-http-server"
|
||||||
"error" true
|
errorTags RuntimeException, errorMessage
|
||||||
"error.type" RuntimeException.name
|
|
||||||
"error.msg" errorMessage
|
|
||||||
"error.stack" String
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,13 +137,13 @@ class AkkaHttpInstrumentationTest extends AgentTestRunner {
|
||||||
errored true
|
errored true
|
||||||
tags {
|
tags {
|
||||||
defaultTags()
|
defaultTags()
|
||||||
"http.status_code" 500
|
"$Tags.HTTP_STATUS.key" 500
|
||||||
"http.url" "http://localhost:$port/server-error"
|
"$Tags.HTTP_URL.key" "http://localhost:$port/server-error"
|
||||||
"http.method" "GET"
|
"$Tags.HTTP_METHOD.key" "GET"
|
||||||
"span.kind" "server"
|
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
|
||||||
"span.type" "web"
|
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
|
||||||
"component" "akka-http-server"
|
"$Tags.COMPONENT.key" "akka-http-server"
|
||||||
"error" true
|
"$Tags.ERROR.key" true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,12 +176,12 @@ class AkkaHttpInstrumentationTest extends AgentTestRunner {
|
||||||
errored false
|
errored false
|
||||||
tags {
|
tags {
|
||||||
defaultTags()
|
defaultTags()
|
||||||
"http.status_code" 404
|
"$Tags.HTTP_STATUS.key" 404
|
||||||
"http.url" "http://localhost:$port/not-found"
|
"$Tags.HTTP_URL.key" "http://localhost:$port/not-found"
|
||||||
"http.method" "GET"
|
"$Tags.HTTP_METHOD.key" "GET"
|
||||||
"span.kind" "server"
|
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
|
||||||
"span.type" "web"
|
"$DDTags.SPAN_TYPE" DDSpanTypes.WEB_SERVLET
|
||||||
"component" "akka-http-server"
|
"$Tags.COMPONENT.key" "akka-http-server"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue