From a0a11a51d0fcd55c7d2365bb955faf277bd4d386 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Mon, 2 Apr 2018 16:59:57 -0700 Subject: [PATCH] Use ScopeManager to enable/disable low level async instrumentation. --- .../src/main/scala/AkkaActors.scala | 4 +++ .../src/test/scala/ScalaConcurrentTests.scala | 6 ++++ .../concurrent/ExecutorInstrumentation.java | 14 +++++++-- .../groovy/ExecutorInstrumentationTest.groovy | 4 +++ .../play/PlayInstrumentation.java | 29 ++++++++++++------- .../datadog/trace/context/TraceScope.java | 6 ++++ .../scopemanager/ContinuableScope.java | 12 ++++++++ 7 files changed, 62 insertions(+), 13 deletions(-) diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala index cc77d43487..09ec95f021 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala +++ b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala @@ -1,4 +1,5 @@ import datadog.trace.api.Trace +import datadog.trace.context.TraceScope import akka.pattern.ask import io.opentracing.util.GlobalTracer @@ -32,18 +33,21 @@ class AkkaActors { @Trace def basicTell() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ! Greet } @Trace def basicAsk() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ? Greet } @Trace def basicForward() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) helloGreeter ! WhoToGreet("Akka") helloGreeter ? Greet } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala index fd0a71851a..1b56a37d44 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala @@ -1,4 +1,5 @@ import datadog.trace.api.Trace +import datadog.trace.context.TraceScope import io.opentracing.util.GlobalTracer import scala.concurrent.ExecutionContext.Implicits.global @@ -12,6 +13,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithFutureAndCallbacks() : Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val goodFuture: Future[Integer] = Future { tracedChild("goodFuture") 1 @@ -32,6 +34,7 @@ class ScalaConcurrentTests { @Trace def tracedAcrossThreadsWithNoTrace() :Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val goodFuture: Future[Integer] = Future { 1 } @@ -51,6 +54,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithPromises() : Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val keptPromise = Promise[Boolean]() val brokenPromise = Promise[Boolean]() val afterPromise = keptPromise.future @@ -83,6 +87,7 @@ class ScalaConcurrentTests { */ @Trace def tracedWithFutureFirstCompletions() :Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val completedVal = Future.firstCompletedOf( List( Future { @@ -106,6 +111,7 @@ class ScalaConcurrentTests { */ @Trace def tracedTimeout(): Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val f: Future[String] = Future { tracedChild("timeoutChild") while(true) { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 5ccea111f4..d8ab6114c2 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -140,7 +140,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { + if (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncLinking() + && task != null + && !(task instanceof DatadogWrapper)) { task = new RunnableWrapper(task, (TraceScope) scope); return (RunnableWrapper) task; } @@ -161,7 +164,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { + if (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncLinking() + && task != null + && !(task instanceof DatadogWrapper)) { task = new CallableWrapper(task, (TraceScope) scope); return (CallableWrapper) task; } @@ -182,7 +188,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { public static Collection wrapJob( @Advice.Argument(value = 0, readOnly = false) Collection> tasks) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope) { + if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncLinking()) { Collection> wrappedTasks = new ArrayList<>(tasks.size()); for (Callable task : tasks) { if (task != null) { @@ -241,6 +247,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { @Override public void run() { final TraceScope context = continuation.activate(); + context.setAsyncLinking(true); try { delegatee.run(); } finally { @@ -261,6 +268,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { @Override public T call() throws Exception { final TraceScope context = continuation.activate(); + context.setAsyncLinking(true); try { return delegatee.call(); } finally { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 5884d02b24..6466b91e54 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -1,6 +1,8 @@ import datadog.opentracing.DDSpan +import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.Trace +import io.opentracing.util.GlobalTracer import spock.lang.Shared import spock.lang.Unroll @@ -48,6 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true) // this child will have a span m.invoke(pool, new AsyncChild()) // this child won't @@ -92,6 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true) try { for (int i = 0; i < 20; ++ i) { Future f = pool.submit((Callable)child) diff --git a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java index 7821da279f..aec5d947e1 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java +++ b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java @@ -9,6 +9,7 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.*; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; +import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -77,9 +78,7 @@ public final class PlayInstrumentation extends Instrumenter.Configurable { public static class PlayAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope startSpan(@Advice.Argument(0) final Request req) { - // TODO - // begin tracking across threads - + final Scope scope; if (GlobalTracer.get().activeSpan() == null) { final SpanContext extractedContext; if (GlobalTracer.get().scopeManager().active() == null) { @@ -88,15 +87,21 @@ public final class PlayInstrumentation extends Instrumenter.Configurable { } else { extractedContext = null; } - return GlobalTracer.get() - .buildSpan("play.request") - .asChildOf(extractedContext) - .startActive(false); + scope = + GlobalTracer.get() + .buildSpan("play.request") + .asChildOf(extractedContext) + .startActive(false); } else { // An upstream framework (e.g. akka-http, netty) has already started the span. // Do not extract the context. - return GlobalTracer.get().buildSpan("play.request").startActive(false); + scope = GlobalTracer.get().buildSpan("play.request").startActive(false); } + + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true); + } + return scope; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -169,6 +174,9 @@ public final class PlayInstrumentation extends Instrumenter.Configurable { @Override public Object apply(Throwable t, boolean isCheck) throws Exception { try { + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false); + } onError(span, t); } catch (Throwable t2) { LoggerFactory.getLogger(RequestCallback.class).debug("error in play instrumentation", t); @@ -193,8 +201,9 @@ public final class PlayInstrumentation extends Instrumenter.Configurable { } public Result apply(Result result) { - // TODO - // stop tracking across threads + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false); + } try { Tags.HTTP_STATUS.set(span, result.header().status()); } catch (Throwable t) { diff --git a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java index c859d14d2d..5b04795118 100644 --- a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java +++ b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java @@ -13,6 +13,12 @@ public interface TraceScope { /** Close the activated context and allow any underlying spans to finish. */ void close(); + /** If true, this context will propagate across async boundaries. */ + boolean isAsyncLinking(); + + /** Set context's async propagation value. */ + void setAsyncLinking(boolean value); + /** Used to pass async context between workers. */ interface Continuation { /** diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 943f2b1779..26a60d324c 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -27,6 +27,8 @@ public class ContinuableScope implements Scope, TraceScope { private final Scope toRestore; /** Continuation that created this scope. May be null. */ private final Continuation continuation; + /** Flag to propagate this scope across async boundaries. */ + private final AtomicBoolean isAsyncLinking = new AtomicBoolean(false); ContinuableScope( final ContextualScopeManager scopeManager, @@ -70,6 +72,16 @@ public class ContinuableScope implements Scope, TraceScope { return spanUnderScope; } + @Override + public void setAsyncLinking(boolean value) { + isAsyncLinking.set(value); + } + + @Override + public boolean isAsyncLinking() { + return isAsyncLinking.get(); + } + /** * The continuation returned should be closed after the associa *