diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/groovy/LagomTest.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/groovy/LagomTest.groovy index 39f936a6fd..7f23f203fe 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/groovy/LagomTest.groovy +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/groovy/LagomTest.groovy @@ -5,6 +5,7 @@ import akka.stream.testkit.javadsl.TestSink import com.lightbend.lagom.javadsl.testkit.ServiceTest import datadog.opentracing.DDSpan import org.junit.After +import scala.concurrent.duration.FiniteDuration import static java.util.concurrent.TimeUnit.SECONDS; import datadog.trace.agent.test.AgentTestRunner @@ -51,7 +52,7 @@ class LagomTest extends AgentTestRunner { server.stop() } - def "200 traces" () { + def "normal request traces" () { setup: EchoService service = server.client(EchoService.class) @@ -62,8 +63,7 @@ class LagomTest extends AgentTestRunner { .concat(Source.maybe()) Source output = service.echo().invoke(input) .toCompletableFuture().get(5, SECONDS) - Probe probe = output.runWith(TestSink.probe(server.system()), - server.materializer()) + Probe probe = output.runWith(TestSink.probe(server.system()), server.materializer()) probe.request(10) probe.expectNext("msg1") probe.expectNext("msg2") @@ -89,4 +89,38 @@ class LagomTest extends AgentTestRunner { root.context().tags["component"] == "akkahttp-action" } + def "error traces" () { + setup: + EchoService service = server.client(EchoService.class) + + // Use a source that never terminates (concat Source.maybe) so we + // don't close the upstream, which would close the downstream + Source input = + Source.from(Arrays.asList("msg1", "msg2", "msg3")) + .concat(Source.maybe()) + try { + Source output = service.error().invoke(input) + .toCompletableFuture().get(5, SECONDS) + } catch (Exception e) { + } + + TEST_WRITER.waitForTraces(1) + DDSpan[] akkaTrace = TEST_WRITER.get(0) + DDSpan root = akkaTrace[0] + + expect: + TEST_WRITER.size() == 1 + akkaTrace.size() == 1 + + root.serviceName == "unnamed-java-app" + root.operationName == "akkahttp.request" + root.resourceName == "GET ws://?/error" + root.context().getErrorFlag() + root.context().tags["http.status_code"] == 500 + root.context().tags["http.url"] == "ws://localhost:${server.port()}/error" + root.context().tags["http.method"] == "GET" + root.context().tags["span.kind"] == "server" + root.context().tags["component"] == "akkahttp-action" + } + } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoService.java b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoService.java index 2896da4e6d..747804b3d5 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoService.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoService.java @@ -1,15 +1,16 @@ +import static com.lightbend.lagom.javadsl.api.Service.*; + import akka.NotUsed; import akka.stream.javadsl.Source; import com.lightbend.lagom.javadsl.api.*; -import static com.lightbend.lagom.javadsl.api.Service.*; public interface EchoService extends Service { ServiceCall, Source> echo(); + ServiceCall, Source> error(); + default Descriptor descriptor() { - return named("echo").withCalls( - namedCall("echo", this::echo) - ); + return named("echo").withCalls(namedCall("echo", this::echo), namedCall("error", this::error)); } } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoServiceImpl.java b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoServiceImpl.java index c7c9f0d190..0825468748 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoServiceImpl.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/EchoServiceImpl.java @@ -1,16 +1,23 @@ -import static java.util.concurrent.CompletableFuture.completedFuture; -import com.lightbend.lagom.javadsl.api.ServiceCall; + import akka.NotUsed; import akka.stream.javadsl.Source; +import com.lightbend.lagom.javadsl.api.ServiceCall; import datadog.trace.api.Trace; - import java.util.List; +import java.util.concurrent.CompletableFuture; public class EchoServiceImpl implements EchoService { @Override public ServiceCall, Source> echo() { - return req -> completedFuture(Source.from(tracedMethod())); + final CompletableFuture> fut = new CompletableFuture<>(); + ServiceTestModule.executor.submit(() -> fut.complete(Source.from(tracedMethod()))); + return req -> fut; + } + + @Override + public ServiceCall, Source> error() { + throw new RuntimeException("lagom exception"); } @Trace diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/ServiceTestModule.java b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/ServiceTestModule.java index f80b58e17f..ff0af8c1f6 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/ServiceTestModule.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/lagomTest/java/ServiceTestModule.java @@ -1,5 +1,5 @@ -import com.google.inject.Binder; import com.google.inject.AbstractModule; +import com.google.inject.Binder; import com.lightbend.lagom.internal.javadsl.BinderAccessor; import com.lightbend.lagom.internal.javadsl.server.JavadslServicesRouter; import com.lightbend.lagom.internal.javadsl.server.ResolvedServices; @@ -10,30 +10,33 @@ import com.lightbend.lagom.javadsl.api.ServiceInfo; import com.lightbend.lagom.javadsl.server.LagomServiceRouter; import com.lightbend.lagom.javadsl.server.ServiceGuiceSupport; import com.lightbend.lagom.javadsl.server.status.MetricsService; - import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class ServiceTestModule extends AbstractModule implements ServiceGuiceSupport { + public static final ExecutorService executor = Executors.newCachedThreadPool(); @Override protected void configure() { bindServices( - serviceBinding(EchoService.class, EchoServiceImpl.class) - // , serviceBinding(HelloService.class, HelloServiceImpl.class) - ); + serviceBinding(EchoService.class, EchoServiceImpl.class) + // , serviceBinding(HelloService.class, HelloServiceImpl.class) + ); } - // ------------------------------ /** - * This is a copy of {@link com.lightbend.lagom.javadsl.server.ServiceGuiceSupport#bindServices(ServiceGuiceSupport.ServiceBinding[])} - * that should survive deprecation. When removing the method from the superclass this should inherit the removed code. + * This is a copy of {@link + * com.lightbend.lagom.javadsl.server.ServiceGuiceSupport#bindServices(ServiceGuiceSupport.ServiceBinding[])} + * that should survive deprecation. When removing the method from the superclass this should + * inherit the removed code. * - * This method is used in docs/ so that many tests can share a single Guice module. + *

This method is used in docs/ so that many tests can share a single Guice module. */ - @Override - public void bindServices(ServiceBinding... serviceBindings) { + @Override + public void bindServices(ServiceBinding... serviceBindings) { Binder binder = BinderAccessor.binder(this); for (ServiceBinding binding : serviceBindings) { @@ -51,28 +54,31 @@ public class ServiceTestModule extends AbstractModule implements ServiceGuiceSup ServiceBinding primaryServiceBinding = serviceBindings[0]; // Bind the service info for the first one passed in - binder.bind(ServiceInfo.class).toProvider( + binder + .bind(ServiceInfo.class) + .toProvider( new ServiceInfoProvider( - primaryServiceBinding.serviceInterface(), - Arrays - .stream(serviceBindings) - .map(ServiceBinding::serviceInterface) - .toArray(Class[]::new) - )); + primaryServiceBinding.serviceInterface(), + Arrays.stream(serviceBindings) + .map(ServiceBinding::serviceInterface) + .toArray(Class[]::new))); // Bind the metrics - ServiceBinding metricsServiceBinding = serviceBinding(MetricsService.class, MetricsServiceImpl.class); - binder.bind(((ClassServiceBinding) metricsServiceBinding).serviceImplementation()).asEagerSingleton(); + ServiceBinding metricsServiceBinding = + serviceBinding(MetricsService.class, MetricsServiceImpl.class); + binder + .bind(((ClassServiceBinding) metricsServiceBinding).serviceImplementation()) + .asEagerSingleton(); ServiceBinding[] allServiceBindings = new ServiceBinding[serviceBindings.length + 1]; System.arraycopy(serviceBindings, 0, allServiceBindings, 0, serviceBindings.length); allServiceBindings[allServiceBindings.length - 1] = metricsServiceBinding; // Bind the resolved services - binder.bind(ResolvedServices.class).toProvider(new ResolvedServicesProvider(allServiceBindings)); + binder + .bind(ResolvedServices.class) + .toProvider(new ResolvedServicesProvider(allServiceBindings)); // And bind the router binder.bind(LagomServiceRouter.class).to(JavadslServicesRouter.class); } - - } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpInstrumentation.java index 1b53553604..06b38e3950 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpInstrumentation.java @@ -6,13 +6,13 @@ import akka.http.javadsl.model.HttpHeader; import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; import akka.stream.*; -import akka.stream.scaladsl.Flow; import akka.stream.stage.*; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.*; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; import datadog.trace.context.TraceScope; +import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.propagation.Format; @@ -26,6 +26,10 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import net.bytebuddy.agent.builder.AgentBuilder; import net.bytebuddy.asm.Advice; +import scala.Function1; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.runtime.AbstractFunction1; @Slf4j @AutoService(Instrumenter.class) @@ -34,21 +38,19 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable { super("akkahttp"); } - // TODO: Disable Instrumentation by default + // TODO: sync vs async testing // TODO: Use test DSL - // TODO: remove testWithScala from lagom // TODO: Merge into play testing - // TODO: also check 10.0.8 (play 2.6.0 dep) and latest 10.1 + // TODO: Disable Instrumentation by default + // TODO: remove testWithScala from lagom private static final HelperInjector akkaHttpHelperInjector = new HelperInjector( - AkkaHttpInstrumentation.class.getName() + "$DatadogGraph", - AkkaHttpInstrumentation.class.getName() + "$AkkaHttpHeaders", - AkkaHttpInstrumentation.class.getName() + "$DatadogLogic", - AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$1", - AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$2", - AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$3", - AkkaHttpInstrumentation.class.getName() + "$DatadogLogic$4"); + AkkaHttpInstrumentation.class.getName() + "$DatadogSyncWrapper", + AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper", + AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper$1", + AkkaHttpInstrumentation.class.getName() + "$DatadogAsyncWrapper$2", + AkkaHttpInstrumentation.class.getName() + "$AkkaHttpHeaders"); @Override public AgentBuilder apply(final AgentBuilder agentBuilder) { @@ -59,200 +61,140 @@ public final class AkkaHttpInstrumentation extends Instrumenter.Configurable { .transform( DDAdvice.create() .advice( - named("bindAndHandle") - .and(takesArgument(0, named("akka.stream.scaladsl.Flow"))), - AkkaHttpAdvice.class.getName())) - .asDecorator() - .type(named("akka.stream.impl.fusing.GraphInterpreter")) - .transform(DDTransformers.defaultTransformers()) - .transform(akkaHttpHelperInjector) + named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))), + AkkaHttpSyncAdvice.class.getName())) .transform( - DDAdvice.create().advice(named("execute"), GraphInterpreterAdvice.class.getName())) + DDAdvice.create() + .advice( + named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))), + AkkaHttpAsyncAdvice.class.getName())) .asDecorator(); } - /** Wrap user's Flow in a datadog graph */ - public static class AkkaHttpAdvice { - // TODO: rename to wrapHandler + public static class AkkaHttpSyncAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void startSpan( + public static void wrapHandler( @Advice.Argument(value = 0, readOnly = false) - Flow handler) { - // recommended way to wrap a flow is to use join with a custom graph stage - // https://groups.google.com/forum/#!topic/akka-user/phtZM_kuy7o - handler = handler.join(new DatadogGraph()); + Function1 handler) { + handler = new DatadogSyncWrapper(handler); } } - /** Close spans created by DatadogLogic */ - public static class GraphInterpreterAdvice { - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void closeScope() { - if (DatadogLogic.NUM_SCOPES_TO_CLOSE.get() != null) { - int numScopesToClose = DatadogLogic.NUM_SCOPES_TO_CLOSE.get(); - DatadogLogic.NUM_SCOPES_TO_CLOSE.set(0); - while (numScopesToClose > 0 && GlobalTracer.get().scopeManager().active() != null) { - GlobalTracer.get().scopeManager().active().close(); - numScopesToClose--; - } + public static class AkkaHttpAsyncAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrapHandler( + @Advice.Argument(value = 0, readOnly = false) + Function1> handler, + @Advice.Argument(value = 7) Materializer materializer) { + handler = new DatadogAsyncWrapper(handler, materializer.executionContext()); + } + } + + public static class DatadogSyncWrapper extends AbstractFunction1 { + private final Function1 userHandler; + + public DatadogSyncWrapper(Function1 userHandler) { + this.userHandler = userHandler; + } + + @Override + public HttpResponse apply(HttpRequest request) { + final Scope scope = DatadogSyncWrapper.createSpan(request); + try { + final HttpResponse response = userHandler.apply(request); + scope.close(); + finishSpan(scope.span(), response); + return response; + } catch (Throwable t) { + scope.close(); + finishSpan(scope.span(), t); + throw t; } } - } - public static class DatadogGraph - extends GraphStage> { - private final Inlet in1 = Inlet.create("datadog.in1"); - private final Outlet out1 = Outlet.create("datadog.toWrapped"); - private final Inlet in2 = Inlet.create("datadog.fromWrapped"); - private final Outlet out2 = Outlet.create("datadog.out2"); - private final BidiShape shape = - BidiShape.of(in1, out1, in2, out2); - - @Override - public BidiShape shape() { - return shape; - } - - @Override - public GraphStageLogic createLogic(Attributes inheritedAttributes) { - return new DatadogLogic(shape()); - } - } - - /** Stateful logic of the akka http pipeline */ - public static class DatadogLogic extends GraphStageLogic { - /** Signal the graph logic advice to close the scope at the end of an execution phase. */ - // ideally there would be a way to push a close-scope event after - // the user's input handler has run, but there doesn't seem to be a way to do that - public static final ThreadLocal NUM_SCOPES_TO_CLOSE = new ThreadLocal<>(); - - // safe to use volatile without locking because - // ordering and number of invocations of handler is guaranteed by akka streams - // ideally this would be a final variable, but the span cannot be set until - // the in2 handler is invoked by the graph logic - private volatile Span span; - - // Response | -> in1 --> out1 | tcp -> - // Request | <- out2 <-- in2 | <- tcp - public DatadogLogic( - final BidiShape shape) { - super(shape); - - setHandler( - shape.in2(), - new AbstractInHandler() { - @Override - public void onPush() { - final HttpRequest request = grab(shape.in2()); - createSpan(request); - push(shape.out2(), request); - } - - @Override - public void onUpstreamFailure(Throwable ex) throws Exception { - finishSpan(ex); - super.onUpstreamFailure(ex); - } - }); - - setHandler( - shape.out2(), - new AbstractOutHandler() { - @Override - public void onPull() { - pull(shape.in2()); - } - - @Override - public void onDownstreamFinish() throws Exception { - // Invoked on errors. Don't complete this stage to allow error-capturing - } - }); - - setHandler( - shape.in1(), - new AbstractInHandler() { - @Override - public void onPush() { - final HttpResponse response = grab(shape.in1()); - finishSpan(response); - push(shape.out1(), response); - } - - @Override - public void onUpstreamFailure(Throwable ex) throws Exception { - if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { - ((TraceScope) GlobalTracer.get().scopeManager().active()) - .setAsyncPropagation(false); - } - finishSpan(ex); - - super.onUpstreamFailure(ex); - } - }); - - setHandler( - shape.out1(), - new AbstractOutHandler() { - @Override - public void onPull() { - pull(shape.in1()); - } - }); - } - - private void createSpan(final HttpRequest request) { - SpanContext extractedContext = + public static Scope createSpan(HttpRequest request) { + final SpanContext extractedContext = GlobalTracer.get().extract(Format.Builtin.HTTP_HEADERS, new AkkaHttpHeaders(request)); - - span = + final Scope scope = GlobalTracer.get() .buildSpan("akkahttp.request") .asChildOf(extractedContext) - .startActive(false) - .span(); - // close the created scope at the end of the graph execution - if (null == NUM_SCOPES_TO_CLOSE.get()) { - NUM_SCOPES_TO_CLOSE.set(1); - } else { - NUM_SCOPES_TO_CLOSE.set(NUM_SCOPES_TO_CLOSE.get() + 1); - } + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER) + .withTag(Tags.HTTP_METHOD.getKey(), request.method().value()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET) + .withTag(Tags.COMPONENT.getKey(), "akkahttp-action") + .withTag(Tags.HTTP_URL.getKey(), request.getUri().toString()) + .startActive(false); - if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { - ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true); + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); } - - span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER); - span.setTag(Tags.HTTP_METHOD.getKey(), request.method().value()); - span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET); - span.setTag(Tags.COMPONENT.getKey(), "akkahttp-action"); - span.setTag(Tags.HTTP_URL.getKey(), request.getUri().toString()); + return scope; } - private void finishSpan(final HttpResponse response) { - if (null != span) { - stopScopePropagation(); - Tags.HTTP_STATUS.set(span, response.status().intValue()); - span.finish(); - span = null; - } - } + public static void finishSpan(Span span, HttpResponse response) { + Tags.HTTP_STATUS.set(span, response.status().intValue()); - private void finishSpan(final Throwable t) { - if (null != span) { - stopScopePropagation(); - Tags.ERROR.set(span, true); - span.log(Collections.singletonMap("error.object", t)); - Tags.HTTP_STATUS.set(span, 500); - span.finish(); - span = null; - } - } - - private void stopScopePropagation() { if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false); } + span.finish(); + } + + public static void finishSpan(Span span, Throwable t) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap("error.object", t)); + Tags.HTTP_STATUS.set(span, 500); + + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false); + } + span.finish(); + } + } + + public static class DatadogAsyncWrapper + extends AbstractFunction1> { + private final Function1> userHandler; + private final ExecutionContext executionContext; + + public DatadogAsyncWrapper( + Function1> userHandler, + ExecutionContext executionContext) { + this.userHandler = userHandler; + this.executionContext = executionContext; + } + + @Override + public Future apply(HttpRequest request) { + final Scope scope = DatadogSyncWrapper.createSpan(request); + Future futureResponse = null; + try { + futureResponse = userHandler.apply(request); + } catch (Throwable t) { + scope.close(); + DatadogSyncWrapper.finishSpan(scope.span(), t); + throw t; + } + final Future wrapped = + futureResponse.transform( + new AbstractFunction1() { + @Override + public HttpResponse apply(HttpResponse response) { + DatadogSyncWrapper.finishSpan(scope.span(), response); + return response; + } + }, + new AbstractFunction1() { + @Override + public Throwable apply(Throwable t) { + DatadogSyncWrapper.finishSpan(scope.span(), t); + return t; + } + }, + executionContext); + scope.close(); + return wrapped; } }