From bae2d7dde8d78870bd7fcdd1d025900fe61aa58f Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Fri, 8 Feb 2019 11:43:06 -0500 Subject: [PATCH] Allow non-wrapped tasks in disabled executors Some executors cannot handle tasks that have been wrapped into `{Runnable,Callable}Wrapper` because they require certain subclass of `{Callable,Runnable}` in order to work. We have a test that effectively disables instrumentation for such executors. This change makes sure that tasks that do not need to be wrapped (which essentially means anything that is not lambda) still get traced in such executors. One notable example of affected executor type is `ScheduledThreadPoolExecutor`. --- .../AkkaExecutorInstrumentationTest.groovy | 76 ++++---- .../ScalaExecutorInstrumentationTest.groovy | 76 ++++---- .../ExecutorInstrumentationUtils.java | 23 ++- .../concurrent/FutureInstrumentation.java | 3 +- .../JavaExecutorInstrumentation.java | 51 ++++-- .../ThreadPoolExecutorInstrumentation.java | 2 +- .../groovy/ExecutorInstrumentationTest.groovy | 164 +++++++++++++----- .../trace/agent/test/utils/TraceUtils.groovy | 1 - .../scopemanager/ContinuableScope.java | 2 +- .../scopemanager/ScopeManagerTest.groovy | 72 ++++++++ 10 files changed, 306 insertions(+), 164 deletions(-) diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy index 4462ebe265..d2e337da5f 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy @@ -8,11 +8,8 @@ import io.opentracing.util.GlobalTracer import spock.lang.Shared import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException import java.util.concurrent.ThreadPoolExecutor @@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit * This is to large extent a copy of ExecutorInstrumentationTest. */ class AkkaExecutorInstrumentationTest extends AgentTestRunner { - @Shared - Method executeRunnableMethod - @Shared - Method akkaExecuteForkJoinTaskMethod - @Shared - Method submitRunnableMethod - @Shared - Method submitCallableMethod - @Shared - Method akkaSubmitForkJoinTaskMethod - @Shared - Method akkaInvokeForkJoinTaskMethod - def setupSpec() { - executeRunnableMethod = Executor.getMethod("execute", Runnable) - akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) - submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) - submitCallableMethod = ExecutorService.getMethod("submit", Callable) - akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) - akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) - } + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def akkaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + @Shared + def submitCallable = { e, c -> e.submit((Callable) c) } + @Shared + def akkaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) } + @Shared + def akkaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl #method propagates"() { + def "#poolImpl '#name' propagates"() { setup: def pool = poolImpl def m = method @@ -58,9 +45,9 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new AkkaAsyncChild()) + m(pool, new AkkaAsyncChild()) // this child won't - m.invoke(pool, new AkkaAsyncChild(false, false)) + m(pool, new AkkaAsyncChild(false, false)) } }.run() @@ -79,22 +66,21 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { // Unfortunately, there's no simple way to test the cross product of methods/pools. where: - poolImpl | method - new ForkJoinPool() | executeRunnableMethod - new ForkJoinPool() | akkaExecuteForkJoinTaskMethod - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod - new ForkJoinPool() | akkaSubmitForkJoinTaskMethod - new ForkJoinPool() | akkaInvokeForkJoinTaskMethod + name | method | poolImpl + "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "execute Runnable" | executeRunnable | new ForkJoinPool() + "execute ForkJoinTask" | akkaExecuteForkJoinTask | new ForkJoinPool() + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() + "submit ForkJoinTask" | akkaSubmitForkJoinTask | new ForkJoinPool() + "invoke ForkJoinTask" | akkaInvokeForkJoinTask | new ForkJoinPool() } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl reports after canceled jobs"() { + def "#poolImpl '#name' reports after canceled jobs"() { setup: def pool = poolImpl def m = method @@ -117,7 +103,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { final AkkaAsyncChild child = new AkkaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new AkkaAsyncChild()) + Future f = m(pool, new AkkaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -142,8 +128,8 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner { TEST_WRITER.size() == 1 where: - poolImpl | method - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod + name | method | poolImpl + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() } } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy index b46af31619..78780d8fb1 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy @@ -8,11 +8,8 @@ import scala.concurrent.forkjoin.ForkJoinTask import spock.lang.Shared import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException import java.util.concurrent.ThreadPoolExecutor @@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit * This is to large extent a copy of ExecutorInstrumentationTest. */ class ScalaExecutorInstrumentationTest extends AgentTestRunner { - @Shared - Method executeRunnableMethod - @Shared - Method scalaExecuteForkJoinTaskMethod - @Shared - Method submitRunnableMethod - @Shared - Method submitCallableMethod - @Shared - Method scalaSubmitForkJoinTaskMethod - @Shared - Method scalaInvokeForkJoinTaskMethod - def setupSpec() { - executeRunnableMethod = Executor.getMethod("execute", Runnable) - scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) - submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) - submitCallableMethod = ExecutorService.getMethod("submit", Callable) - scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) - scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) - } + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def scalaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + @Shared + def submitCallable = { e, c -> e.submit((Callable) c) } + @Shared + def scalaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) } + @Shared + def scalaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl #method propagates"() { + def "#poolImpl '#name' propagates"() { setup: def pool = poolImpl def m = method @@ -58,9 +45,9 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new ScalaAsyncChild()) + m(pool, new ScalaAsyncChild()) // this child won't - m.invoke(pool, new ScalaAsyncChild(false, false)) + m(pool, new ScalaAsyncChild(false, false)) } }.run() @@ -79,22 +66,21 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { // Unfortunately, there's no simple way to test the cross product of methods/pools. where: - poolImpl | method - new ForkJoinPool() | executeRunnableMethod - new ForkJoinPool() | scalaExecuteForkJoinTaskMethod - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod - new ForkJoinPool() | scalaSubmitForkJoinTaskMethod - new ForkJoinPool() | scalaInvokeForkJoinTaskMethod + name | method | poolImpl + "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "execute Runnable" | executeRunnable | new ForkJoinPool() + "execute ForkJoinTask" | scalaExecuteForkJoinTask | new ForkJoinPool() + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() + "submit ForkJoinTask" | scalaSubmitForkJoinTask | new ForkJoinPool() + "invoke ForkJoinTask" | scalaInvokeForkJoinTask | new ForkJoinPool() } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl reports after canceled jobs"() { + def "#poolImpl '#name' reports after canceled jobs"() { setup: def pool = poolImpl def m = method @@ -117,7 +103,7 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { final ScalaAsyncChild child = new ScalaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new ScalaAsyncChild()) + Future f = m(pool, new ScalaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -142,8 +128,8 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner { TEST_WRITER.size() == 1 where: - poolImpl | method - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod + name | method | poolImpl + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java index 779a95b80f..e954596143 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java @@ -2,6 +2,8 @@ package datadog.trace.instrumentation.java.concurrent; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.WeakMap; +import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper; +import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper; import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import datadog.trace.context.TraceScope; import io.opentracing.Scope; @@ -13,7 +15,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ExecutorInstrumentationUtils { - private static final WeakMap DISABLED_EXECUTORS = + private static final WeakMap EXECUTORS_DISABLED_FOR_WRAPPED_TASKS = WeakMap.Provider.newWeakMap(); /** @@ -28,7 +30,7 @@ public class ExecutorInstrumentationUtils { return (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating() && task != null - && !ExecutorInstrumentationUtils.isDisabled(executor)); + && !ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, task)); } /** @@ -74,12 +76,19 @@ public class ExecutorInstrumentationUtils { } } - public static void disableExecutor(final Executor executor) { - log.debug("Disabling Executor tracing for instance {}", executor); - DISABLED_EXECUTORS.put(executor, true); + public static void disableExecutorForWrappedTasks(final Executor executor) { + log.debug("Disabling Executor tracing for wrapped tasks for instance {}", executor); + EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.put(executor, true); } - public static boolean isDisabled(final Executor executor) { - return DISABLED_EXECUTORS.containsKey(executor); + /** + * Check if Executor can accept given task. + * + *

Disabled executors cannot accept wrapped tasks, non wrapped tasks (i.e. tasks with injected + * fields) should still work fine. + */ + public static boolean isExecutorDisabledForThisTask(final Executor executor, final Object task) { + return (task instanceof RunnableWrapper || task instanceof CallableWrapper) + && EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.containsKey(executor); } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java index bda2ef91ea..a1b2317c21 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java @@ -61,7 +61,8 @@ public final class FutureInstrumentation extends Instrumenter.Default { "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask", "com.google.common.util.concurrent.SettableFuture", "com.google.common.util.concurrent.AbstractFuture$TrustedFuture", - "com.google.common.util.concurrent.AbstractFuture" + "com.google.common.util.concurrent.AbstractFuture", + "io.netty.util.concurrent.ScheduledFutureTask" }; WHITELISTED_FUTURES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java index 5e28ecf74c..0cb9bfaa6b 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java @@ -61,11 +61,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen named("submit").and(takesArgument(0, ForkJoinTask.class)), SetJavaForkJoinStateAdvice.class.getName()); transformers.put( - nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)), + nameMatches("invoke(Any|All)$").and(takesArgument(0, Collection.class)), SetCallableStateForCallableCollectionAdvice.class.getName()); transformers.put( nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)), SetJavaForkJoinStateAdvice.class.getName()); + transformers.put( + named("schedule").and(takesArgument(0, Runnable.class)), + SetSubmitRunnableStateAdvice.class.getName()); + transformers.put( + named("schedule").and(takesArgument(0, Callable.class)), + SetCallableStateAdvice.class.getName()); transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler named("dispatch") .and(takesArgument(0, Runnable.class)) @@ -81,11 +87,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { - task = RunnableWrapper.wrapIfNeeded(task); + final Runnable newTask = RunnableWrapper.wrapIfNeeded(task); + // It is important to check potentially wrapped task if we can instrument task in this + // executor. Some executors do not support wrapped tasks. + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) { + task = newTask; final ContextStore contextStore = InstrumentationContext.get(Runnable.class, State.class); - return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); } return null; } @@ -130,11 +139,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { - task = RunnableWrapper.wrapIfNeeded(task); + final Runnable newTask = RunnableWrapper.wrapIfNeeded(task); + // It is important to check potentially wrapped task if we can instrument task in this + // executor. Some executors do not support wrapped tasks. + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) { + task = newTask; final ContextStore contextStore = InstrumentationContext.get(Runnable.class, State.class); - return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); } return null; } @@ -161,11 +173,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { - task = CallableWrapper.wrapIfNeeded(task); + final Callable newTask = CallableWrapper.wrapIfNeeded(task); + // It is important to check potentially wrapped task if we can instrument task in this + // executor. Some executors do not support wrapped tasks. + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) { + task = newTask; final ContextStore contextStore = InstrumentationContext.get(Callable.class, State.class); - return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); } return null; } @@ -196,13 +211,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen && ((TraceScope) scope).isAsyncPropagating() && tasks != null) { final Collection> wrappedTasks = new ArrayList<>(tasks.size()); - for (Callable task : tasks) { + for (final Callable task : tasks) { if (task != null) { - task = CallableWrapper.wrapIfNeeded(task); - wrappedTasks.add(task); - final ContextStore contextStore = - InstrumentationContext.get(Callable.class, State.class); - ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + final Callable newTask = CallableWrapper.wrapIfNeeded(task); + if (ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, newTask)) { + wrappedTasks.add(task); + } else { + wrappedTasks.add(newTask); + final ContextStore contextStore = + InstrumentationContext.get(Callable.class, State.class); + ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope); + } } } tasks = wrappedTasks; diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java index 0ce35dee03..c9aa518893 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java @@ -66,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { } catch (final ClassCastException | IllegalArgumentException e) { // These errors indicate the queue is fundamentally incompatible with wrapped runnables. // We must disable the executor instance to avoid passing wrapped runnables later. - ExecutorInstrumentationUtils.disableExecutor(executor); + ExecutorInstrumentationUtils.disableExecutorForWrappedTasks(executor); } catch (final Exception e) { // Other errors might indicate the queue is not fully initialized. // We might want to disable for those too, but for now just ignore. diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index a99f774453..2897f02d95 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -2,48 +2,50 @@ import datadog.opentracing.DDSpan import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.Trace +import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper +import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper import io.opentracing.util.GlobalTracer import spock.lang.Shared import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService import java.util.concurrent.ForkJoinPool import java.util.concurrent.ForkJoinTask import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit class ExecutorInstrumentationTest extends AgentTestRunner { - @Shared - Method executeRunnableMethod - @Shared - Method executeForkJoinTaskMethod - @Shared - Method submitRunnableMethod - @Shared - Method submitCallableMethod - @Shared - Method submitForkJoinTaskMethod - @Shared - Method invokeForkJoinTaskMethod - def setupSpec() { - executeRunnableMethod = Executor.getMethod("execute", Runnable) - executeForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) - submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) - submitCallableMethod = ExecutorService.getMethod("submit", Callable) - submitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) - invokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) - } + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def executeForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + @Shared + def submitCallable = { e, c -> e.submit((Callable) c) } + @Shared + def submitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) } + @Shared + def invokeAll = { e, c -> e.invokeAll([(Callable) c]) } + @Shared + def invokeAllTimeout = { e, c -> e.invokeAll([(Callable) c], 10, TimeUnit.SECONDS) } + @Shared + def invokeAny = { e, c -> e.invokeAny([(Callable) c]) } + @Shared + def invokeAnyTimeout = { e, c -> e.invokeAny([(Callable) c], 10, TimeUnit.SECONDS) } + @Shared + def invokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) } + @Shared + def scheduleRunnable = { e, c -> e.schedule((Runnable) c, 10, TimeUnit.MILLISECONDS) } + @Shared + def scheduleCallable = { e, c -> e.schedule((Callable) c, 10, TimeUnit.MILLISECONDS) } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl #method propagates"() { + def "#poolImpl '#name' propagates"() { setup: def pool = poolImpl def m = method @@ -54,9 +56,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new JavaAsyncChild()) + m(pool, new JavaAsyncChild()) // this child won't - m.invoke(pool, new JavaAsyncChild(false, false)) + m(pool, new JavaAsyncChild(false, false)) } }.run() @@ -75,23 +77,84 @@ class ExecutorInstrumentationTest extends AgentTestRunner { // Unfortunately, there's no simple way to test the cross product of methods/pools. where: - poolImpl | method - new ForkJoinPool() | executeRunnableMethod - new ForkJoinPool() | executeForkJoinTaskMethod - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod - new ForkJoinPool() | submitForkJoinTaskMethod - new ForkJoinPool() | invokeForkJoinTaskMethod + name | method | poolImpl + "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAll" | invokeAll | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAll with timeout" | invokeAllTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAny" | invokeAny | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "invokeAny with timeout" | invokeAnyTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // Scheduled executor has additional methods and also may get disabled because it wraps tasks + "execute Runnable" | executeRunnable | new ScheduledThreadPoolExecutor(1) + "submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1) + "submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1) + "invokeAll" | invokeAll | new ScheduledThreadPoolExecutor(1) + "invokeAll with timeout" | invokeAllTimeout | new ScheduledThreadPoolExecutor(1) + "invokeAny" | invokeAny | new ScheduledThreadPoolExecutor(1) + "invokeAny with timeout" | invokeAnyTimeout | new ScheduledThreadPoolExecutor(1) + "schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1) + "schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1) + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "execute Runnable" | executeRunnable | new ForkJoinPool() + "execute ForkJoinTask" | executeForkJoinTask | new ForkJoinPool() + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() + "submit ForkJoinTask" | submitForkJoinTask | new ForkJoinPool() + "invoke ForkJoinTask" | invokeForkJoinTask | new ForkJoinPool() + "invokeAll" | invokeAll | new ForkJoinPool() + "invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool() + "invokeAny" | invokeAny | new ForkJoinPool() + "invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool() } - // more useful name breaks java9 javac - // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() - def "#poolImpl reports after canceled jobs"() { + def "#poolImpl '#name' disabled wrapping"() { + setup: + def pool = poolImpl + def m = method + def w = wrap + + JavaAsyncChild child = new JavaAsyncChild(true, true) + new Runnable() { + @Override + @Trace(operationName = "parent") + void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) + m(pool, w(child)) + } + }.run() + // We block in child to make sure spans close in predictable order + child.unblock() + + // Expect two traces because async propagation gets effectively disabled + TEST_WRITER.waitForTraces(2) + + expect: + TEST_WRITER.size() == 2 + TEST_WRITER.get(0).size() == 1 + TEST_WRITER.get(0).get(0).operationName == "parent" + TEST_WRITER.get(1).size() == 1 + TEST_WRITER.get(1).get(0).operationName == "asyncChild" + + cleanup: + pool?.shutdown() + + where: + // Scheduled executor cannot accept wrapped tasks + // TODO: we should have a test that passes lambda, but this is hard + // because this requires tests to be run in java8+ only. + // Instead we 'hand-wrap' tasks in this test. + name | method | wrap | poolImpl + "execute Runnable" | executeRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "submit Runnable" | submitRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "submit Callable" | submitCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "schedule Runnable" | scheduleRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + "schedule Callable" | scheduleCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1) + } + + def "#poolImpl '#name' reports after canceled jobs"() { setup: def pool = poolImpl def m = method @@ -114,7 +177,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { final JavaAsyncChild child = new JavaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new JavaAsyncChild()) + Future f = m(pool, new JavaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -139,11 +202,18 @@ class ExecutorInstrumentationTest extends AgentTestRunner { TEST_WRITER.size() == 1 where: - poolImpl | method - new ForkJoinPool() | submitRunnableMethod - new ForkJoinPool() | submitCallableMethod + name | method | poolImpl + "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) + "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod - new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + // Scheduled executor has additional methods and also may get disabled because it wraps tasks + "submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1) + "submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1) + "schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1) + "schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1) + + // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with + "submit Runnable" | submitRunnable | new ForkJoinPool() + "submit Callable" | submitCallable | new ForkJoinPool() } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy index 02cd354d08..8ba7433588 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/TraceUtils.groovy @@ -30,7 +30,6 @@ class TraceUtils { throw e } finally { - ((TraceScope) scope).setAsyncPropagation(false) scope.close() } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index fcc40ce56b..a40126b343 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -159,7 +159,7 @@ public class ContinuableScope implements Scope, TraceScope { if (closeContinuationScope) { ContinuableScope.this.close(); } else { - // Same in in 'close()' above. + // Same as in 'close()' above. if (openCount.decrementAndGet() == 0 && finishOnClose) { spanUnderScope.finish(); } diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index a6d30eadd1..92a5501887 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -183,6 +183,78 @@ class ScopeManagerTest extends Specification { false | true } + def "Continuation.close closes parent scope"() { + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() + + when: + /* + Note: this API is inherently broken. Our scope implementation doesn't allow us to close scopes + in random order, yet when we close continuation we attempt to close scope by default. + And in fact continuation trying to close parent scope is most likely a bug. + */ + continuation.close(true) + + then: + scopeManager.active() == null + !spanFinished(scope.span()) + + when: + scope.span().finish() + + then: + scopeManager.active() == null + } + + def "Continuation.close doesn't close parent scope"() { + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() + + when: + continuation.close(false) + + then: + scopeManager.active() == scope + } + + def "Continuation.close doesn't close parent scope, span finishes"() { + /* + This is highly confusing behaviour. Sequence of events is as following: + * Scope gets created along with span and with finishOnClose == true. + * Continuation gets created for that scope. + * Scope is closed. + At this point scope is not really closed. It is removed from scope + stack, but it is still alive because there is a live continuation attached + to it. This also means span is not closed. + * Continuation is closed. + This triggers final closing of scope and closing of the span. + + This is confusing because expected behaviour is for span to be closed + with the scope when finishOnClose = true, but in fact span lingers until + continuation is closed. + */ + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() + scope.close() + + when: + continuation.close(false) + + then: + scopeManager.active() == null + spanFinished(scope.span()) + writer == [[scope.span()]] + } + @Timeout(value = 60, unit = TimeUnit.SECONDS) def "hard reference on continuation prevents trace from reporting"() { setup: