diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/State.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/State.java index bebad98cc8..f2627f777f 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/State.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/State.java @@ -36,7 +36,9 @@ public class State { public void closeContinuation() { final TraceScope.Continuation continuation = continuationRef.getAndSet(null); if (continuation != null) { - continuation.close(); + // We have opened this continuation, we shall not close parent scope when we close it, + // otherwise owners of that scope will get confused. + continuation.close(false); } } diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy index 4462ebe265..d2e337da5f 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy @@ -8,11 +8,8 @@ import io.opentracing.util.GlobalTracer import spock.lang.Shared import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException import java.util.concurrent.ThreadPoolExecutor @@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit * This is to large extent a copy of ExecutorInstrumentationTest. */ class AkkaExecutorInstrumentationTest extends AgentTestRunner { - @Shared - Method executeRunnableMethod - @Shared - Method akkaExecuteForkJoinTaskMethod - @Shared - Method submitRunnableMethod - @Shared - Method submitCallableMethod - @Shared - Method akkaSubmitForkJoinTaskMethod - @Shared - Method akkaInvokeForkJoinTaskMethod - def setupSpec() { - executeRunnableMethod = Executor.getMethod("execute", Runnable) - akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) - submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) - submitCallableMethod = ExecutorService.getMethod("submit", Callable) - akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) - akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) - } + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def akkaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + @Shared + def submitCallable = { e, c -> e.submit((Callable) c) } + @Shared + def akkaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) } + @Shared + def akkaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl #method propagates"() { + def "#poolImpl '#name' propagates"() { setup: def pool = poolImpl def m = method @@ -58,9 +45,9 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new AkkaAsyncChild()) + m(pool, new AkkaAsyncChild()) // this child won't - m.invoke(pool, new AkkaAsyncChild(false, false)) + m(pool, new AkkaAsyncChild(false, false)) } }.run() @@ -79,22 +66,21 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { // Unfortunately, there's no simple way to test the cross product of methods/pools. where: - poolImpl | method - new ForkJoinPool() | executeRunnableMethod - new ForkJoinPool() | akkaExecuteForkJoinTaskMethod - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod - new ForkJoinPool() | akkaSubmitForkJoinTaskMethod - new ForkJoinPool() | akkaInvokeForkJoinTaskMethod + name | method | poolImpl + "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "execute Runnable" | executeRunnable | new ForkJoinPool() + "execute ForkJoinTask" | akkaExecuteForkJoinTask | new ForkJoinPool() + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() + "submit ForkJoinTask" | akkaSubmitForkJoinTask | new ForkJoinPool() + "invoke ForkJoinTask" | akkaInvokeForkJoinTask | new ForkJoinPool() } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl reports after canceled jobs"() { + def "#poolImpl '#name' reports after canceled jobs"() { setup: def pool = poolImpl def m = method @@ -117,7 +103,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { final AkkaAsyncChild child = new AkkaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new AkkaAsyncChild()) + Future f = m(pool, new AkkaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -142,8 +128,8 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { TEST_WRITER.size() == 1 where: - poolImpl | method - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod + name | method | poolImpl + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() } } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy index b46af31619..78780d8fb1 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy @@ -8,11 +8,8 @@ import scala.concurrent.forkjoin.ForkJoinTask import spock.lang.Shared import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException import java.util.concurrent.ThreadPoolExecutor @@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit * This is to large extent a copy of ExecutorInstrumentationTest. */ class ScalaExecutorInstrumentationTest extends AgentTestRunner { - @Shared - Method executeRunnableMethod - @Shared - Method scalaExecuteForkJoinTaskMethod - @Shared - Method submitRunnableMethod - @Shared - Method submitCallableMethod - @Shared - Method scalaSubmitForkJoinTaskMethod - @Shared - Method scalaInvokeForkJoinTaskMethod - def setupSpec() { - executeRunnableMethod = Executor.getMethod("execute", Runnable) - scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) - submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) - submitCallableMethod = ExecutorService.getMethod("submit", Callable) - scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) - scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) - } + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def scalaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + @Shared + def submitCallable = { e, c -> e.submit((Callable) c) } + @Shared + def scalaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) } + @Shared + def scalaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl #method propagates"() { + def "#poolImpl '#name' propagates"() { setup: def pool = poolImpl def m = method @@ -58,9 +45,9 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new ScalaAsyncChild()) + m(pool, new ScalaAsyncChild()) // this child won't - m.invoke(pool, new ScalaAsyncChild(false, false)) + m(pool, new ScalaAsyncChild(false, false)) } }.run() @@ -79,22 +66,21 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { // Unfortunately, there's no simple way to test the cross product of methods/pools. where: - poolImpl | method - new ForkJoinPool() | executeRunnableMethod - new ForkJoinPool() | scalaExecuteForkJoinTaskMethod - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod - new ForkJoinPool() | scalaSubmitForkJoinTaskMethod - new ForkJoinPool() | scalaInvokeForkJoinTaskMethod + name | method | poolImpl + "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "execute Runnable" | executeRunnable | new ForkJoinPool() + "execute ForkJoinTask" | scalaExecuteForkJoinTask | new ForkJoinPool() + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() + "submit ForkJoinTask" | scalaSubmitForkJoinTask | new ForkJoinPool() + "invoke ForkJoinTask" | scalaInvokeForkJoinTask | new ForkJoinPool() } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl reports after canceled jobs"() { + def "#poolImpl '#name' reports after canceled jobs"() { setup: def pool = poolImpl def m = method @@ -117,7 +103,7 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { final ScalaAsyncChild child = new ScalaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new ScalaAsyncChild()) + Future f = m(pool, new ScalaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -142,8 +128,8 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { TEST_WRITER.size() == 1 where: - poolImpl | method - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod + name | method | poolImpl + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java index 779a95b80f..e954596143 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java @@ -2,6 +2,8 @@ package datadog.trace.instrumentation.java.concurrent; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.WeakMap; +import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper; +import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper; import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import datadog.trace.context.TraceScope; import io.opentracing.Scope; @@ -13,7 +15,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ExecutorInstrumentationUtils { - private static final WeakMap DISABLED_EXECUTORS = + private static final WeakMap EXECUTORS_DISABLED_FOR_WRAPPED_TASKS = WeakMap.Provider.newWeakMap(); /** @@ -28,7 +30,7 @@ public class ExecutorInstrumentationUtils { return (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating() && task != null - && !ExecutorInstrumentationUtils.isDisabled(executor)); + && !ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, task)); } /** @@ -74,12 +76,19 @@ public class ExecutorInstrumentationUtils { } } - public static void disableExecutor(final Executor executor) { - log.debug("Disabling Executor tracing for instance {}", executor); - DISABLED_EXECUTORS.put(executor, true); + public static void disableExecutorForWrappedTasks(final Executor executor) { + log.debug("Disabling Executor tracing for wrapped tasks for instance {}", executor); + EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.put(executor, true); } - public static boolean isDisabled(final Executor executor) { - return DISABLED_EXECUTORS.containsKey(executor); + /** + * Check if Executor can accept given task. + * + *

Disabled executors cannot accept wrapped tasks, non wrapped tasks (i.e. tasks with injected + * fields) should still work fine. + */ + public static boolean isExecutorDisabledForThisTask(final Executor executor, final Object task) { + return (task instanceof RunnableWrapper || task instanceof CallableWrapper) + && EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.containsKey(executor); } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java index 64fb66b397..a1b2317c21 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java @@ -61,7 +61,8 @@ public final class FutureInstrumentation extends Instrumenter.Default { "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask", "com.google.common.util.concurrent.SettableFuture", "com.google.common.util.concurrent.AbstractFuture$TrustedFuture", - "com.google.common.util.concurrent.AbstractFuture" + "com.google.common.util.concurrent.AbstractFuture", + "io.netty.util.concurrent.ScheduledFutureTask" }; WHITELISTED_FUTURES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); } @@ -102,15 +103,15 @@ public final class FutureInstrumentation extends Instrumenter.Default { public static class CanceledFutureAdvice { @Advice.OnMethodExit(suppress = Throwable.class) - public static void exit( - @Advice.This final Future future, @Advice.Return final boolean canceled) { - if (canceled) { - final ContextStore contextStore = - InstrumentationContext.get(Future.class, State.class); - final State state = contextStore.get(future); - if (state != null) { - state.closeContinuation(); - } + public static void exit(@Advice.This final Future future) { + // Try to close continuation even if future was not cancelled: + // the expectation is that continuation should be closed after 'cancel' + // is called, one way or another + final ContextStore contextStore = + InstrumentationContext.get(Future.class, State.class); + final State state = contextStore.get(future); + if (state != null) { + state.closeContinuation(); } } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java index 5e28ecf74c..0cb9bfaa6b 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java @@ -61,11 +61,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen named("submit").and(takesArgument(0, ForkJoinTask.class)), SetJavaForkJoinStateAdvice.class.getName()); transformers.put( - nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)), + nameMatches("invoke(Any|All)$").and(takesArgument(0, Collection.class)), SetCallableStateForCallableCollectionAdvice.class.getName()); transformers.put( nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)), SetJavaForkJoinStateAdvice.class.getName()); + transformers.put( + named("schedule").and(takesArgument(0, Runnable.class)), + SetSubmitRunnableStateAdvice.class.getName()); + transformers.put( + named("schedule").and(takesArgument(0, Callable.class)), + SetCallableStateAdvice.class.getName()); transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler named("dispatch") .and(takesArgument(0, Runnable.class)) @@ -81,11 +87,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { - task = RunnableWrapper.wrapIfNeeded(task); + final Runnable newTask = RunnableWrapper.wrapIfNeeded(task); + // It is important to check potentially wrapped task if we can instrument task in this + // executor. Some executors do not support wrapped tasks. + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) { + task = newTask; final ContextStore contextStore = InstrumentationContext.get(Runnable.class, State.class); - return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); } return null; } @@ -130,11 +139,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { - task = RunnableWrapper.wrapIfNeeded(task); + final Runnable newTask = RunnableWrapper.wrapIfNeeded(task); + // It is important to check potentially wrapped task if we can instrument task in this + // executor. Some executors do not support wrapped tasks. + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) { + task = newTask; final ContextStore contextStore = InstrumentationContext.get(Runnable.class, State.class); - return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); } return null; } @@ -161,11 +173,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { - task = CallableWrapper.wrapIfNeeded(task); + final Callable newTask = CallableWrapper.wrapIfNeeded(task); + // It is important to check potentially wrapped task if we can instrument task in this + // executor. Some executors do not support wrapped tasks. + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) { + task = newTask; final ContextStore contextStore = InstrumentationContext.get(Callable.class, State.class); - return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); } return null; } @@ -196,13 +211,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen && ((TraceScope) scope).isAsyncPropagating() && tasks != null) { final Collection> wrappedTasks = new ArrayList<>(tasks.size()); - for (Callable task : tasks) { + for (final Callable task : tasks) { if (task != null) { - task = CallableWrapper.wrapIfNeeded(task); - wrappedTasks.add(task); - final ContextStore contextStore = - InstrumentationContext.get(Callable.class, State.class); - ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + final Callable newTask = CallableWrapper.wrapIfNeeded(task); + if (ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, newTask)) { + wrappedTasks.add(task); + } else { + wrappedTasks.add(newTask); + final ContextStore contextStore = + InstrumentationContext.get(Callable.class, State.class); + ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); + } } } tasks = wrappedTasks; diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java index 0ce35dee03..c9aa518893 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java @@ -66,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { } catch (final ClassCastException | IllegalArgumentException e) { // These errors indicate the queue is fundamentally incompatible with wrapped runnables. // We must disable the executor instance to avoid passing wrapped runnables later. - ExecutorInstrumentationUtils.disableExecutor(executor); + ExecutorInstrumentationUtils.disableExecutorForWrappedTasks(executor); } catch (final Exception e) { // Other errors might indicate the queue is not fully initialized. // We might want to disable for those too, but for now just ignore. diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index a99f774453..2897f02d95 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -2,48 +2,50 @@ import datadog.opentracing.DDSpan import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.Trace +import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper +import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper import io.opentracing.util.GlobalTracer import spock.lang.Shared import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService import java.util.concurrent.ForkJoinPool import java.util.concurrent.ForkJoinTask import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit class ExecutorInstrumentationTest extends AgentTestRunner { - @Shared - Method executeRunnableMethod - @Shared - Method executeForkJoinTaskMethod - @Shared - Method submitRunnableMethod - @Shared - Method submitCallableMethod - @Shared - Method submitForkJoinTaskMethod - @Shared - Method invokeForkJoinTaskMethod - def setupSpec() { - executeRunnableMethod = Executor.getMethod("execute", Runnable) - executeForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) - submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) - submitCallableMethod = ExecutorService.getMethod("submit", Callable) - submitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) - invokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) - } + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def executeForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + @Shared + def submitCallable = { e, c -> e.submit((Callable) c) } + @Shared + def submitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) } + @Shared + def invokeAll = { e, c -> e.invokeAll([(Callable) c]) } + @Shared + def invokeAllTimeout = { e, c -> e.invokeAll([(Callable) c], 10, TimeUnit.SECONDS) } + @Shared + def invokeAny = { e, c -> e.invokeAny([(Callable) c]) } + @Shared + def invokeAnyTimeout = { e, c -> e.invokeAny([(Callable) c], 10, TimeUnit.SECONDS) } + @Shared + def invokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) } + @Shared + def scheduleRunnable = { e, c -> e.schedule((Runnable) c, 10, TimeUnit.MILLISECONDS) } + @Shared + def scheduleCallable = { e, c -> e.schedule((Callable) c, 10, TimeUnit.MILLISECONDS) } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl #method propagates"() { + def "#poolImpl '#name' propagates"() { setup: def pool = poolImpl def m = method @@ -54,9 +56,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new JavaAsyncChild()) + m(pool, new JavaAsyncChild()) // this child won't - m.invoke(pool, new JavaAsyncChild(false, false)) + m(pool, new JavaAsyncChild(false, false)) } }.run() @@ -75,23 +77,84 @@ class ExecutorInstrumentationTest extends AgentTestRunner { // Unfortunately, there's no simple way to test the cross product of methods/pools. where: - poolImpl | method - new ForkJoinPool() | executeRunnableMethod - new ForkJoinPool() | executeForkJoinTaskMethod - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod - new ForkJoinPool() | submitForkJoinTaskMethod - new ForkJoinPool() | invokeForkJoinTaskMethod + name | method | poolImpl + "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAll" | invokeAll | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAll with timeout" | invokeAllTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAny" | invokeAny | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAny with timeout" | invokeAnyTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // Scheduled executor has additional methods and also may get disabled because it wraps tasks + "execute Runnable" | executeRunnable | new ScheduledThreadPoolExecutor(1) + "submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1) + "submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1) + "invokeAll" | invokeAll | new ScheduledThreadPoolExecutor(1) + "invokeAll with timeout" | invokeAllTimeout | new ScheduledThreadPoolExecutor(1) + "invokeAny" | invokeAny | new ScheduledThreadPoolExecutor(1) + "invokeAny with timeout" | invokeAnyTimeout | new ScheduledThreadPoolExecutor(1) + "schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1) + "schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1) + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "execute Runnable" | executeRunnable | new ForkJoinPool() + "execute ForkJoinTask" | executeForkJoinTask | new ForkJoinPool() + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() + "submit ForkJoinTask" | submitForkJoinTask | new ForkJoinPool() + "invoke ForkJoinTask" | invokeForkJoinTask | new ForkJoinPool() + "invokeAll" | invokeAll | new ForkJoinPool() + "invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool() + "invokeAny" | invokeAny | new ForkJoinPool() + "invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool() } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl reports after canceled jobs"() { + def "#poolImpl '#name' disabled wrapping"() { + setup: + def pool = poolImpl + def m = method + def w = wrap + + JavaAsyncChild child = new JavaAsyncChild(true, true) + new Runnable() { + @Override + @Trace(operationName = "parent") + void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) + m(pool, w(child)) + } + }.run() + // We block in child to make sure spans close in predictable order + child.unblock() + + // Expect two traces because async propagation gets effectively disabled + TEST_WRITER.waitForTraces(2) + + expect: + TEST_WRITER.size() == 2 + TEST_WRITER.get(0).size() == 1 + TEST_WRITER.get(0).get(0).operationName == "parent" + TEST_WRITER.get(1).size() == 1 + TEST_WRITER.get(1).get(0).operationName == "asyncChild" + + cleanup: + pool?.shutdown() + + where: + // Scheduled executor cannot accept wrapped tasks + // TODO: we should have a test that passes lambda, but this is hard + // because this requires tests to be run in java8+ only. + // Instead we 'hand-wrap' tasks in this test. + name | method | wrap | poolImpl + "execute Runnable" | executeRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "submit Runnable" | submitRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "submit Callable" | submitCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "schedule Runnable" | scheduleRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "schedule Callable" | scheduleCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + } + + def "#poolImpl '#name' reports after canceled jobs"() { setup: def pool = poolImpl def m = method @@ -114,7 +177,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { final JavaAsyncChild child = new JavaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new JavaAsyncChild()) + Future f = m(pool, new JavaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -139,11 +202,18 @@ class ExecutorInstrumentationTest extends AgentTestRunner { TEST_WRITER.size() == 1 where: - poolImpl | method - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod + name | method | poolImpl + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // Scheduled executor has additional methods and also may get disabled because it wraps tasks + "submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1) + "submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1) + "schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1) + "schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1) + + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy index 02cd354d08..8ba7433588 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy @@ -30,7 +30,6 @@ class TraceUtils { throw e } finally { - ((TraceScope) scope).setAsyncPropagation(false) scope.close() } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index cf5f49df19..a40126b343 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -159,7 +159,10 @@ public class ContinuableScope implements Scope, TraceScope { if (closeContinuationScope) { ContinuableScope.this.close(); } else { - openCount.decrementAndGet(); + // Same as in 'close()' above. + if (openCount.decrementAndGet() == 0 && finishOnClose) { + spanUnderScope.finish(); + } } } else { log.debug("Failed to close continuation {}. Already used.", this); diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index a6d30eadd1..92a5501887 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -183,6 +183,78 @@ class ScopeManagerTest extends Specification { false | true } + def "Continuation.close closes parent scope"() { + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() + + when: + /* + Note: this API is inherently broken. Our scope implementation doesn't allow us to close scopes + in random order, yet when we close continuation we attempt to close scope by default. + And in fact continuation trying to close parent scope is most likely a bug. + */ + continuation.close(true) + + then: + scopeManager.active() == null + !spanFinished(scope.span()) + + when: + scope.span().finish() + + then: + scopeManager.active() == null + } + + def "Continuation.close doesn't close parent scope"() { + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() + + when: + continuation.close(false) + + then: + scopeManager.active() == scope + } + + def "Continuation.close doesn't close parent scope, span finishes"() { + /* + This is highly confusing behaviour. Sequence of events is as following: + * Scope gets created along with span and with finishOnClose == true. + * Continuation gets created for that scope. + * Scope is closed. + At this point scope is not really closed. It is removed from scope + stack, but it is still alive because there is a live continuation attached + to it. This also means span is not closed. + * Continuation is closed. + This triggers final closing of scope and closing of the span. + + This is confusing because expected behaviour is for span to be closed + with the scope when finishOnClose = true, but in fact span lingers until + continuation is closed. + */ + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() + scope.close() + + when: + continuation.close(false) + + then: + scopeManager.active() == null + spanFinished(scope.span()) + writer == [[scope.span()]] + } + @Timeout(value = 60, unit = TimeUnit.SECONDS) def "hard reference on continuation prevents trace from reporting"() { setup: