Instrument akka-http function handler instead of akka-streams
This commit is contained in:
parent
004b88f634
commit
6b7dda3657
|
@ -5,6 +5,7 @@ import akka.stream.testkit.javadsl.TestSink
|
||||||
import com.lightbend.lagom.javadsl.testkit.ServiceTest
|
import com.lightbend.lagom.javadsl.testkit.ServiceTest
|
||||||
import datadog.opentracing.DDSpan
|
import datadog.opentracing.DDSpan
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
import datadog.trace.agent.test.AgentTestRunner
|
import datadog.trace.agent.test.AgentTestRunner
|
||||||
|
@ -51,7 +52,7 @@ class LagomTest extends AgentTestRunner {
|
||||||
server.stop()
|
server.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
def "200 traces" () {
|
def "normal request traces" () {
|
||||||
setup:
|
setup:
|
||||||
EchoService service = server.client(EchoService.class)
|
EchoService service = server.client(EchoService.class)
|
||||||
|
|
||||||
|
@ -62,8 +63,7 @@ class LagomTest extends AgentTestRunner {
|
||||||
.concat(Source.maybe())
|
.concat(Source.maybe())
|
||||||
Source<String, NotUsed> output = service.echo().invoke(input)
|
Source<String, NotUsed> output = service.echo().invoke(input)
|
||||||
.toCompletableFuture().get(5, SECONDS)
|
.toCompletableFuture().get(5, SECONDS)
|
||||||
Probe<String> probe = output.runWith(TestSink.probe(server.system()),
|
Probe<String> probe = output.runWith(TestSink.probe(server.system()), server.materializer())
|
||||||
server.materializer())
|
|
||||||
probe.request(10)
|
probe.request(10)
|
||||||
probe.expectNext("msg1")
|
probe.expectNext("msg1")
|
||||||
probe.expectNext("msg2")
|
probe.expectNext("msg2")
|
||||||
|
@ -89,4 +89,38 @@ class LagomTest extends AgentTestRunner {
|
||||||
root.context().tags["component"] == "akkahttp-action"
|
root.context().tags["component"] == "akkahttp-action"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "error traces" () {
|
||||||
|
setup:
|
||||||
|
EchoService service = server.client(EchoService.class)
|
||||||
|
|
||||||
|
// Use a source that never terminates (concat Source.maybe) so we
|
||||||
|
// don't close the upstream, which would close the downstream
|
||||||
|
Source<String, NotUsed> input =
|
||||||
|
Source.from(Arrays.asList("msg1", "msg2", "msg3"))
|
||||||
|
.concat(Source.maybe())
|
||||||
|
try {
|
||||||
|
Source<String, NotUsed> output = service.error().invoke(input)
|
||||||
|
.toCompletableFuture().get(5, SECONDS)
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_WRITER.waitForTraces(1)
|
||||||
|
DDSpan[] akkaTrace = TEST_WRITER.get(0)
|
||||||
|
DDSpan root = akkaTrace[0]
|
||||||
|
|
||||||
|
expect:
|
||||||
|
TEST_WRITER.size() == 1
|
||||||
|
akkaTrace.size() == 1
|
||||||
|
|
||||||
|
root.serviceName == "unnamed-java-app"
|
||||||
|
root.operationName == "akkahttp.request"
|
||||||
|
root.resourceName == "GET ws://?/error"
|
||||||
|
root.context().getErrorFlag()
|
||||||
|
root.context().tags["http.status_code"] == 500
|
||||||
|
root.context().tags["http.url"] == "ws://localhost:${server.port()}/error"
|
||||||
|
root.context().tags["http.method"] == "GET"
|
||||||
|
root.context().tags["span.kind"] == "server"
|
||||||
|
root.context().tags["component"] == "akkahttp-action"
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,16 @@
|
||||||
|
import static com.lightbend.lagom.javadsl.api.Service.*;
|
||||||
|
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import com.lightbend.lagom.javadsl.api.*;
|
import com.lightbend.lagom.javadsl.api.*;
|
||||||
import static com.lightbend.lagom.javadsl.api.Service.*;
|
|
||||||
|
|
||||||
public interface EchoService extends Service {
|
public interface EchoService extends Service {
|
||||||
|
|
||||||
ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> echo();
|
ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> echo();
|
||||||
|
|
||||||
|
ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> error();
|
||||||
|
|
||||||
default Descriptor descriptor() {
|
default Descriptor descriptor() {
|
||||||
return named("echo").withCalls(
|
return named("echo").withCalls(namedCall("echo", this::echo), namedCall("error", this::error));
|
||||||
namedCall("echo", this::echo)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,23 @@
|
||||||
import static java.util.concurrent.CompletableFuture.completedFuture;
|
|
||||||
import com.lightbend.lagom.javadsl.api.ServiceCall;
|
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
|
import com.lightbend.lagom.javadsl.api.ServiceCall;
|
||||||
import datadog.trace.api.Trace;
|
import datadog.trace.api.Trace;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public class EchoServiceImpl implements EchoService {
|
public class EchoServiceImpl implements EchoService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> echo() {
|
public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> echo() {
|
||||||
return req -> completedFuture(Source.from(tracedMethod()));
|
final CompletableFuture<Source<String, NotUsed>> fut = new CompletableFuture<>();
|
||||||
|
ServiceTestModule.executor.submit(() -> fut.complete(Source.from(tracedMethod())));
|
||||||
|
return req -> fut;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> error() {
|
||||||
|
throw new RuntimeException("lagom exception");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Trace
|
@Trace
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import com.google.inject.Binder;
|
|
||||||
import com.google.inject.AbstractModule;
|
import com.google.inject.AbstractModule;
|
||||||
|
import com.google.inject.Binder;
|
||||||
import com.lightbend.lagom.internal.javadsl.BinderAccessor;
|
import com.lightbend.lagom.internal.javadsl.BinderAccessor;
|
||||||
import com.lightbend.lagom.internal.javadsl.server.JavadslServicesRouter;
|
import com.lightbend.lagom.internal.javadsl.server.JavadslServicesRouter;
|
||||||
import com.lightbend.lagom.internal.javadsl.server.ResolvedServices;
|
import com.lightbend.lagom.internal.javadsl.server.ResolvedServices;
|
||||||
|
@ -10,30 +10,33 @@ import com.lightbend.lagom.javadsl.api.ServiceInfo;
|
||||||
import com.lightbend.lagom.javadsl.server.LagomServiceRouter;
|
import com.lightbend.lagom.javadsl.server.LagomServiceRouter;
|
||||||
import com.lightbend.lagom.javadsl.server.ServiceGuiceSupport;
|
import com.lightbend.lagom.javadsl.server.ServiceGuiceSupport;
|
||||||
import com.lightbend.lagom.javadsl.server.status.MetricsService;
|
import com.lightbend.lagom.javadsl.server.status.MetricsService;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class ServiceTestModule extends AbstractModule implements ServiceGuiceSupport {
|
public class ServiceTestModule extends AbstractModule implements ServiceGuiceSupport {
|
||||||
|
public static final ExecutorService executor = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
bindServices(
|
bindServices(
|
||||||
serviceBinding(EchoService.class, EchoServiceImpl.class)
|
serviceBinding(EchoService.class, EchoServiceImpl.class)
|
||||||
// , serviceBinding(HelloService.class, HelloServiceImpl.class)
|
// , serviceBinding(HelloService.class, HelloServiceImpl.class)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// ------------------------------
|
// ------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a copy of {@link com.lightbend.lagom.javadsl.server.ServiceGuiceSupport#bindServices(ServiceGuiceSupport.ServiceBinding[])}
|
* This is a copy of {@link
|
||||||
* that should survive deprecation. When removing the method from the superclass this should inherit the removed code.
|
* com.lightbend.lagom.javadsl.server.ServiceGuiceSupport#bindServices(ServiceGuiceSupport.ServiceBinding[])}
|
||||||
|
* that should survive deprecation. When removing the method from the superclass this should
|
||||||
|
* inherit the removed code.
|
||||||
*
|
*
|
||||||
* This method is used in docs/ so that many tests can share a single Guice module.
|
* <p>This method is used in docs/ so that many tests can share a single Guice module.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void bindServices(ServiceBinding<?>... serviceBindings) {
|
public void bindServices(ServiceBinding<?>... serviceBindings) {
|
||||||
Binder binder = BinderAccessor.binder(this);
|
Binder binder = BinderAccessor.binder(this);
|
||||||
|
|
||||||
for (ServiceBinding binding : serviceBindings) {
|
for (ServiceBinding binding : serviceBindings) {
|
||||||
|
@ -51,28 +54,31 @@ public class ServiceTestModule extends AbstractModule implements ServiceGuiceSup
|
||||||
|
|
||||||
ServiceBinding<?> primaryServiceBinding = serviceBindings[0];
|
ServiceBinding<?> primaryServiceBinding = serviceBindings[0];
|
||||||
// Bind the service info for the first one passed in
|
// Bind the service info for the first one passed in
|
||||||
binder.bind(ServiceInfo.class).toProvider(
|
binder
|
||||||
|
.bind(ServiceInfo.class)
|
||||||
|
.toProvider(
|
||||||
new ServiceInfoProvider(
|
new ServiceInfoProvider(
|
||||||
primaryServiceBinding.serviceInterface(),
|
primaryServiceBinding.serviceInterface(),
|
||||||
Arrays
|
Arrays.stream(serviceBindings)
|
||||||
.stream(serviceBindings)
|
.map(ServiceBinding::serviceInterface)
|
||||||
.map(ServiceBinding::serviceInterface)
|
.toArray(Class[]::new)));
|
||||||
.toArray(Class[]::new)
|
|
||||||
));
|
|
||||||
|
|
||||||
// Bind the metrics
|
// Bind the metrics
|
||||||
ServiceBinding<MetricsService> metricsServiceBinding = serviceBinding(MetricsService.class, MetricsServiceImpl.class);
|
ServiceBinding<MetricsService> metricsServiceBinding =
|
||||||
binder.bind(((ClassServiceBinding<?>) metricsServiceBinding).serviceImplementation()).asEagerSingleton();
|
serviceBinding(MetricsService.class, MetricsServiceImpl.class);
|
||||||
|
binder
|
||||||
|
.bind(((ClassServiceBinding<?>) metricsServiceBinding).serviceImplementation())
|
||||||
|
.asEagerSingleton();
|
||||||
ServiceBinding<?>[] allServiceBindings = new ServiceBinding<?>[serviceBindings.length + 1];
|
ServiceBinding<?>[] allServiceBindings = new ServiceBinding<?>[serviceBindings.length + 1];
|
||||||
System.arraycopy(serviceBindings, 0, allServiceBindings, 0, serviceBindings.length);
|
System.arraycopy(serviceBindings, 0, allServiceBindings, 0, serviceBindings.length);
|
||||||
allServiceBindings[allServiceBindings.length - 1] = metricsServiceBinding;
|
allServiceBindings[allServiceBindings.length - 1] = metricsServiceBinding;
|
||||||
|
|
||||||
// Bind the resolved services
|
// Bind the resolved services
|
||||||
binder.bind(ResolvedServices.class).toProvider(new ResolvedServicesProvider(allServiceBindings));
|
binder
|
||||||
|
.bind(ResolvedServices.class)
|
||||||
|
.toProvider(new ResolvedServicesProvider(allServiceBindings));
|
||||||
|
|
||||||
// And bind the router
|
// And bind the router
|
||||||
binder.bind(LagomServiceRouter.class).to(JavadslServicesRouter.class);
|
binder.bind(LagomServiceRouter.class).to(JavadslServicesRouter.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,13 +6,13 @@ import akka.http.javadsl.model.HttpHeader;
|
||||||
import akka.http.scaladsl.model.HttpRequest;
|
import akka.http.scaladsl.model.HttpRequest;
|
||||||
import akka.http.scaladsl.model.HttpResponse;
|
import akka.http.scaladsl.model.HttpResponse;
|
||||||
import akka.stream.*;
|
import akka.stream.*;
|
||||||
import akka.stream.scaladsl.Flow;
|
|
||||||
import akka.stream.stage.*;
|
import akka.stream.stage.*;
|
||||||
import com.google.auto.service.AutoService;
|
import com.google.auto.service.AutoService;
|
||||||
import datadog.trace.agent.tooling.*;
|
import datadog.trace.agent.tooling.*;
|
||||||
import datadog.trace.api.DDSpanTypes;
|
import datadog.trace.api.DDSpanTypes;
|
||||||
import datadog.trace.api.DDTags;
|
import datadog.trace.api.DDTags;
|
||||||
import datadog.trace.context.TraceScope;
|
import datadog.trace.context.TraceScope;
|
||||||
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.Span;
|
import io.opentracing.Span;
|
||||||
import io.opentracing.SpanContext;
|
import io.opentracing.SpanContext;
|
||||||
import io.opentracing.propagation.Format;
|
import io.opentracing.propagation.Format;
|
||||||
|
@ -26,6 +26,10 @@ import java.util.Map;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import net.bytebuddy.agent.builder.AgentBuilder;
|
import net.bytebuddy.agent.builder.AgentBuilder;
|
||||||
import net.bytebuddy.asm.Advice;
|
import net.bytebuddy.asm.Advice;
|
||||||
|
import scala.Function1;
|
||||||
|
import scala.concurrent.ExecutionContext;
|
||||||
|
import scala.concurrent.Future;
|
||||||
|
import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@AutoService(Instrumenter.class)
|
@AutoService(Instrumenter.class)
|
||||||
|
@ -34,21 +38,19 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
super("akkahttp");
|
super("akkahttp");
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Disable Instrumentation by default
|
// TODO: sync vs async testing
|
||||||
// TODO: Use test DSL
|
// TODO: Use test DSL
|
||||||
// TODO: remove testWithScala from lagom
|
|
||||||
// TODO: Merge into play testing
|
// TODO: Merge into play testing
|
||||||
// TODO: also check 10.0.8 (play 2.6.0 dep) and latest 10.1
|
// TODO: Disable Instrumentation by default
|
||||||
|
// TODO: remove testWithScala from lagom
|
||||||
|
|
||||||
private static final HelperInjector akkaHttpHelperInjector =
|
private static final HelperInjector akkaHttpHelperInjector =
|
||||||
new HelperInjector(
|
new HelperInjector(
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogGraph",
|
AkkaHttpInstrumentation.class.getName() + "$DatadogSyncWrapper",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$AkkaHttpHeaders",
|
AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogLogic",
|
AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper$1",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$1",
|
AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper$2",
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$2",
|
AkkaHttpInstrumentation.class.getName() + "$AkkaHttpHeaders");
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$3",
|
|
||||||
AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$4");
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
||||||
|
@ -59,200 +61,140 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable {
|
||||||
.transform(
|
.transform(
|
||||||
DDAdvice.create()
|
DDAdvice.create()
|
||||||
.advice(
|
.advice(
|
||||||
named("bindAndHandle")
|
named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))),
|
||||||
.and(takesArgument(0, named("akka.stream.scaladsl.Flow"))),
|
AkkaHttpSyncAdvice.class.getName()))
|
||||||
AkkaHttpAdvice.class.getName()))
|
|
||||||
.asDecorator()
|
|
||||||
.type(named("akka.stream.impl.fusing.GraphInterpreter"))
|
|
||||||
.transform(DDTransformers.defaultTransformers())
|
|
||||||
.transform(akkaHttpHelperInjector)
|
|
||||||
.transform(
|
.transform(
|
||||||
DDAdvice.create().advice(named("execute"), GraphInterpreterAdvice.class.getName()))
|
DDAdvice.create()
|
||||||
|
.advice(
|
||||||
|
named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))),
|
||||||
|
AkkaHttpAsyncAdvice.class.getName()))
|
||||||
.asDecorator();
|
.asDecorator();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Wrap user's Flow in a datadog graph */
|
public static class AkkaHttpSyncAdvice {
|
||||||
public static class AkkaHttpAdvice {
|
|
||||||
// TODO: rename to wrapHandler
|
|
||||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||||
public static void startSpan(
|
public static void wrapHandler(
|
||||||
@Advice.Argument(value = 0, readOnly = false)
|
@Advice.Argument(value = 0, readOnly = false)
|
||||||
Flow<HttpRequest, HttpResponse, Object> handler) {
|
Function1<HttpRequest, HttpResponse> handler) {
|
||||||
// recommended way to wrap a flow is to use join with a custom graph stage
|
handler = new DatadogSyncWrapper(handler);
|
||||||
// https://groups.google.com/forum/#!topic/akka-user/phtZM_kuy7o
|
|
||||||
handler = handler.join(new DatadogGraph());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Close spans created by DatadogLogic */
|
public static class AkkaHttpAsyncAdvice {
|
||||||
public static class GraphInterpreterAdvice {
|
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
public static void wrapHandler(
|
||||||
public static void closeScope() {
|
@Advice.Argument(value = 0, readOnly = false)
|
||||||
if (DatadogLogic.NUM_SCOPES_TO_CLOSE.get() != null) {
|
Function1<HttpRequest, scala.concurrent.Future<HttpResponse>> handler,
|
||||||
int numScopesToClose = DatadogLogic.NUM_SCOPES_TO_CLOSE.get();
|
@Advice.Argument(value = 7) Materializer materializer) {
|
||||||
DatadogLogic.NUM_SCOPES_TO_CLOSE.set(0);
|
handler = new DatadogAsyncWrapper(handler, materializer.executionContext());
|
||||||
while (numScopesToClose > 0 && GlobalTracer.get().scopeManager().active() != null) {
|
}
|
||||||
GlobalTracer.get().scopeManager().active().close();
|
}
|
||||||
numScopesToClose--;
|
|
||||||
}
|
public static class DatadogSyncWrapper extends AbstractFunction1<HttpRequest, HttpResponse> {
|
||||||
|
private final Function1<HttpRequest, HttpResponse> userHandler;
|
||||||
|
|
||||||
|
public DatadogSyncWrapper(Function1<HttpRequest, HttpResponse> userHandler) {
|
||||||
|
this.userHandler = userHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpResponse apply(HttpRequest request) {
|
||||||
|
final Scope scope = DatadogSyncWrapper.createSpan(request);
|
||||||
|
try {
|
||||||
|
final HttpResponse response = userHandler.apply(request);
|
||||||
|
scope.close();
|
||||||
|
finishSpan(scope.span(), response);
|
||||||
|
return response;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
scope.close();
|
||||||
|
finishSpan(scope.span(), t);
|
||||||
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public static class DatadogGraph
|
public static Scope createSpan(HttpRequest request) {
|
||||||
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
|
final SpanContext extractedContext =
|
||||||
private final Inlet<HttpResponse> in1 = Inlet.create("datadog.in1");
|
|
||||||
private final Outlet<HttpResponse> out1 = Outlet.create("datadog.toWrapped");
|
|
||||||
private final Inlet<HttpRequest> in2 = Inlet.create("datadog.fromWrapped");
|
|
||||||
private final Outlet<HttpRequest> out2 = Outlet.create("datadog.out2");
|
|
||||||
private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
|
|
||||||
BidiShape.of(in1, out1, in2, out2);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
|
|
||||||
return shape;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
|
|
||||||
return new DatadogLogic(shape());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Stateful logic of the akka http pipeline */
|
|
||||||
public static class DatadogLogic extends GraphStageLogic {
|
|
||||||
/** Signal the graph logic advice to close the scope at the end of an execution phase. */
|
|
||||||
// ideally there would be a way to push a close-scope event after
|
|
||||||
// the user's input handler has run, but there doesn't seem to be a way to do that
|
|
||||||
public static final ThreadLocal<Integer> NUM_SCOPES_TO_CLOSE = new ThreadLocal<>();
|
|
||||||
|
|
||||||
// safe to use volatile without locking because
|
|
||||||
// ordering and number of invocations of handler is guaranteed by akka streams
|
|
||||||
// ideally this would be a final variable, but the span cannot be set until
|
|
||||||
// the in2 handler is invoked by the graph logic
|
|
||||||
private volatile Span span;
|
|
||||||
|
|
||||||
// Response | -> in1 --> out1 | tcp ->
|
|
||||||
// Request | <- out2 <-- in2 | <- tcp
|
|
||||||
public DatadogLogic(
|
|
||||||
final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape) {
|
|
||||||
super(shape);
|
|
||||||
|
|
||||||
setHandler(
|
|
||||||
shape.in2(),
|
|
||||||
new AbstractInHandler() {
|
|
||||||
@Override
|
|
||||||
public void onPush() {
|
|
||||||
final HttpRequest request = grab(shape.in2());
|
|
||||||
createSpan(request);
|
|
||||||
push(shape.out2(), request);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onUpstreamFailure(Throwable ex) throws Exception {
|
|
||||||
finishSpan(ex);
|
|
||||||
super.onUpstreamFailure(ex);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
setHandler(
|
|
||||||
shape.out2(),
|
|
||||||
new AbstractOutHandler() {
|
|
||||||
@Override
|
|
||||||
public void onPull() {
|
|
||||||
pull(shape.in2());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onDownstreamFinish() throws Exception {
|
|
||||||
// Invoked on errors. Don't complete this stage to allow error-capturing
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
setHandler(
|
|
||||||
shape.in1(),
|
|
||||||
new AbstractInHandler() {
|
|
||||||
@Override
|
|
||||||
public void onPush() {
|
|
||||||
final HttpResponse response = grab(shape.in1());
|
|
||||||
finishSpan(response);
|
|
||||||
push(shape.out1(), response);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onUpstreamFailure(Throwable ex) throws Exception {
|
|
||||||
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
|
||||||
((TraceScope) GlobalTracer.get().scopeManager().active())
|
|
||||||
.setAsyncPropagation(false);
|
|
||||||
}
|
|
||||||
finishSpan(ex);
|
|
||||||
|
|
||||||
super.onUpstreamFailure(ex);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
setHandler(
|
|
||||||
shape.out1(),
|
|
||||||
new AbstractOutHandler() {
|
|
||||||
@Override
|
|
||||||
public void onPull() {
|
|
||||||
pull(shape.in1());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createSpan(final HttpRequest request) {
|
|
||||||
SpanContext extractedContext =
|
|
||||||
GlobalTracer.get().extract(Format.Builtin.HTTP_HEADERS, new AkkaHttpHeaders(request));
|
GlobalTracer.get().extract(Format.Builtin.HTTP_HEADERS, new AkkaHttpHeaders(request));
|
||||||
|
final Scope scope =
|
||||||
span =
|
|
||||||
GlobalTracer.get()
|
GlobalTracer.get()
|
||||||
.buildSpan("akkahttp.request")
|
.buildSpan("akkahttp.request")
|
||||||
.asChildOf(extractedContext)
|
.asChildOf(extractedContext)
|
||||||
.startActive(false)
|
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
|
||||||
.span();
|
.withTag(Tags.HTTP_METHOD.getKey(), request.method().value())
|
||||||
// close the created scope at the end of the graph execution
|
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET)
|
||||||
if (null == NUM_SCOPES_TO_CLOSE.get()) {
|
.withTag(Tags.COMPONENT.getKey(), "akkahttp-action")
|
||||||
NUM_SCOPES_TO_CLOSE.set(1);
|
.withTag(Tags.HTTP_URL.getKey(), request.getUri().toString())
|
||||||
} else {
|
.startActive(false);
|
||||||
NUM_SCOPES_TO_CLOSE.set(NUM_SCOPES_TO_CLOSE.get() + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
if (scope instanceof TraceScope) {
|
||||||
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true);
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
}
|
}
|
||||||
|
return scope;
|
||||||
span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER);
|
|
||||||
span.setTag(Tags.HTTP_METHOD.getKey(), request.method().value());
|
|
||||||
span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET);
|
|
||||||
span.setTag(Tags.COMPONENT.getKey(), "akkahttp-action");
|
|
||||||
span.setTag(Tags.HTTP_URL.getKey(), request.getUri().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void finishSpan(final HttpResponse response) {
|
public static void finishSpan(Span span, HttpResponse response) {
|
||||||
if (null != span) {
|
Tags.HTTP_STATUS.set(span, response.status().intValue());
|
||||||
stopScopePropagation();
|
|
||||||
Tags.HTTP_STATUS.set(span, response.status().intValue());
|
|
||||||
span.finish();
|
|
||||||
span = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void finishSpan(final Throwable t) {
|
|
||||||
if (null != span) {
|
|
||||||
stopScopePropagation();
|
|
||||||
Tags.ERROR.set(span, true);
|
|
||||||
span.log(Collections.singletonMap("error.object", t));
|
|
||||||
Tags.HTTP_STATUS.set(span, 500);
|
|
||||||
span.finish();
|
|
||||||
span = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void stopScopePropagation() {
|
|
||||||
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
||||||
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
|
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
|
||||||
}
|
}
|
||||||
|
span.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void finishSpan(Span span, Throwable t) {
|
||||||
|
Tags.ERROR.set(span, true);
|
||||||
|
span.log(Collections.singletonMap("error.object", t));
|
||||||
|
Tags.HTTP_STATUS.set(span, 500);
|
||||||
|
|
||||||
|
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
||||||
|
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
|
||||||
|
}
|
||||||
|
span.finish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class DatadogAsyncWrapper
|
||||||
|
extends AbstractFunction1<HttpRequest, Future<HttpResponse>> {
|
||||||
|
private final Function1<HttpRequest, Future<HttpResponse>> userHandler;
|
||||||
|
private final ExecutionContext executionContext;
|
||||||
|
|
||||||
|
public DatadogAsyncWrapper(
|
||||||
|
Function1<HttpRequest, Future<HttpResponse>> userHandler,
|
||||||
|
ExecutionContext executionContext) {
|
||||||
|
this.userHandler = userHandler;
|
||||||
|
this.executionContext = executionContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<HttpResponse> apply(HttpRequest request) {
|
||||||
|
final Scope scope = DatadogSyncWrapper.createSpan(request);
|
||||||
|
Future<HttpResponse> futureResponse = null;
|
||||||
|
try {
|
||||||
|
futureResponse = userHandler.apply(request);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
scope.close();
|
||||||
|
DatadogSyncWrapper.finishSpan(scope.span(), t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
final Future<HttpResponse> wrapped =
|
||||||
|
futureResponse.transform(
|
||||||
|
new AbstractFunction1<HttpResponse, HttpResponse>() {
|
||||||
|
@Override
|
||||||
|
public HttpResponse apply(HttpResponse response) {
|
||||||
|
DatadogSyncWrapper.finishSpan(scope.span(), response);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new AbstractFunction1<Throwable, Throwable>() {
|
||||||
|
@Override
|
||||||
|
public Throwable apply(Throwable t) {
|
||||||
|
DatadogSyncWrapper.finishSpan(scope.span(), t);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
executionContext);
|
||||||
|
scope.close();
|
||||||
|
return wrapped;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue