Merge pull request #701 from DataDog/mar-kolya/improve-executor-instrumentation

Allow non-wrapped tasks in disabled executors
This commit is contained in:
Nikolay Martynov 2019-02-11 12:18:36 -05:00 committed by GitHub
commit f33600d1f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 321 additions and 174 deletions

View File

@ -36,7 +36,9 @@ public class State {
public void closeContinuation() { public void closeContinuation() {
final TraceScope.Continuation continuation = continuationRef.getAndSet(null); final TraceScope.Continuation continuation = continuationRef.getAndSet(null);
if (continuation != null) { if (continuation != null) {
continuation.close(); // We have opened this continuation, we shall not close parent scope when we close it,
// otherwise owners of that scope will get confused.
continuation.close(false);
} }
} }

View File

@ -8,11 +8,8 @@ import io.opentracing.util.GlobalTracer
import spock.lang.Shared import spock.lang.Shared
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable import java.util.concurrent.Callable
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor
@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit
* This is to large extent a copy of ExecutorInstrumentationTest. * This is to large extent a copy of ExecutorInstrumentationTest.
*/ */
class AkkaExecutorInstrumentationTest extends AgentTestRunner { class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Shared
Method executeRunnableMethod
@Shared
Method akkaExecuteForkJoinTaskMethod
@Shared
Method submitRunnableMethod
@Shared
Method submitCallableMethod
@Shared
Method akkaSubmitForkJoinTaskMethod
@Shared
Method akkaInvokeForkJoinTaskMethod
def setupSpec() { @Shared
executeRunnableMethod = Executor.getMethod("execute", Runnable) def executeRunnable = { e, c -> e.execute((Runnable) c) }
akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) @Shared
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) def akkaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
submitCallableMethod = ExecutorService.getMethod("submit", Callable) @Shared
akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) def submitRunnable = { e, c -> e.submit((Runnable) c) }
akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) @Shared
} def submitCallable = { e, c -> e.submit((Callable) c) }
@Shared
def akkaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
@Shared
def akkaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
// more useful name breaks java9 javac def "#poolImpl '#name' propagates"() {
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl #method propagates"() {
setup: setup:
def pool = poolImpl def pool = poolImpl
def m = method def m = method
@ -58,9 +45,9 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
void run() { void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span // this child will have a span
m.invoke(pool, new AkkaAsyncChild()) m(pool, new AkkaAsyncChild())
// this child won't // this child won't
m.invoke(pool, new AkkaAsyncChild(false, false)) m(pool, new AkkaAsyncChild(false, false))
} }
}.run() }.run()
@ -79,22 +66,21 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
// Unfortunately, there's no simple way to test the cross product of methods/pools. // Unfortunately, there's no simple way to test the cross product of methods/pools.
where: where:
poolImpl | method name | method | poolImpl
new ForkJoinPool() | executeRunnableMethod "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | akkaExecuteForkJoinTaskMethod "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitRunnableMethod "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitCallableMethod
new ForkJoinPool() | akkaSubmitForkJoinTaskMethod
new ForkJoinPool() | akkaInvokeForkJoinTaskMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod "execute Runnable" | executeRunnable | new ForkJoinPool()
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod "execute ForkJoinTask" | akkaExecuteForkJoinTask | new ForkJoinPool()
"submit Runnable" | submitRunnable | new ForkJoinPool()
"submit Callable" | submitCallable | new ForkJoinPool()
"submit ForkJoinTask" | akkaSubmitForkJoinTask | new ForkJoinPool()
"invoke ForkJoinTask" | akkaInvokeForkJoinTask | new ForkJoinPool()
} }
// more useful name breaks java9 javac def "#poolImpl '#name' reports after canceled jobs"() {
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl reports after canceled jobs"() {
setup: setup:
def pool = poolImpl def pool = poolImpl
def m = method def m = method
@ -117,7 +103,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
final AkkaAsyncChild child = new AkkaAsyncChild(true, true) final AkkaAsyncChild child = new AkkaAsyncChild(true, true)
children.add(child) children.add(child)
try { try {
Future f = m.invoke(pool, new AkkaAsyncChild()) Future f = m(pool, new AkkaAsyncChild())
jobFutures.add(f) jobFutures.add(f)
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw e.getCause() throw e.getCause()
@ -142,8 +128,8 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
TEST_WRITER.size() == 1 TEST_WRITER.size() == 1
where: where:
poolImpl | method name | method | poolImpl
new ForkJoinPool() | submitRunnableMethod "submit Runnable" | submitRunnable | new ForkJoinPool()
new ForkJoinPool() | submitCallableMethod "submit Callable" | submitCallable | new ForkJoinPool()
} }
} }

View File

@ -8,11 +8,8 @@ import scala.concurrent.forkjoin.ForkJoinTask
import spock.lang.Shared import spock.lang.Shared
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable import java.util.concurrent.Callable
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor
@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit
* This is to large extent a copy of ExecutorInstrumentationTest. * This is to large extent a copy of ExecutorInstrumentationTest.
*/ */
class ScalaExecutorInstrumentationTest extends AgentTestRunner { class ScalaExecutorInstrumentationTest extends AgentTestRunner {
@Shared
Method executeRunnableMethod
@Shared
Method scalaExecuteForkJoinTaskMethod
@Shared
Method submitRunnableMethod
@Shared
Method submitCallableMethod
@Shared
Method scalaSubmitForkJoinTaskMethod
@Shared
Method scalaInvokeForkJoinTaskMethod
def setupSpec() { @Shared
executeRunnableMethod = Executor.getMethod("execute", Runnable) def executeRunnable = { e, c -> e.execute((Runnable) c) }
scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) @Shared
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) def scalaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
submitCallableMethod = ExecutorService.getMethod("submit", Callable) @Shared
scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) def submitRunnable = { e, c -> e.submit((Runnable) c) }
scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) @Shared
} def submitCallable = { e, c -> e.submit((Callable) c) }
@Shared
def scalaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
@Shared
def scalaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
// more useful name breaks java9 javac def "#poolImpl '#name' propagates"() {
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl #method propagates"() {
setup: setup:
def pool = poolImpl def pool = poolImpl
def m = method def m = method
@ -58,9 +45,9 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
void run() { void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span // this child will have a span
m.invoke(pool, new ScalaAsyncChild()) m(pool, new ScalaAsyncChild())
// this child won't // this child won't
m.invoke(pool, new ScalaAsyncChild(false, false)) m(pool, new ScalaAsyncChild(false, false))
} }
}.run() }.run()
@ -79,22 +66,21 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
// Unfortunately, there's no simple way to test the cross product of methods/pools. // Unfortunately, there's no simple way to test the cross product of methods/pools.
where: where:
poolImpl | method name | method | poolImpl
new ForkJoinPool() | executeRunnableMethod "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | scalaExecuteForkJoinTaskMethod "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitRunnableMethod "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitCallableMethod
new ForkJoinPool() | scalaSubmitForkJoinTaskMethod
new ForkJoinPool() | scalaInvokeForkJoinTaskMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod "execute Runnable" | executeRunnable | new ForkJoinPool()
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod "execute ForkJoinTask" | scalaExecuteForkJoinTask | new ForkJoinPool()
"submit Runnable" | submitRunnable | new ForkJoinPool()
"submit Callable" | submitCallable | new ForkJoinPool()
"submit ForkJoinTask" | scalaSubmitForkJoinTask | new ForkJoinPool()
"invoke ForkJoinTask" | scalaInvokeForkJoinTask | new ForkJoinPool()
} }
// more useful name breaks java9 javac def "#poolImpl '#name' reports after canceled jobs"() {
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl reports after canceled jobs"() {
setup: setup:
def pool = poolImpl def pool = poolImpl
def m = method def m = method
@ -117,7 +103,7 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
final ScalaAsyncChild child = new ScalaAsyncChild(true, true) final ScalaAsyncChild child = new ScalaAsyncChild(true, true)
children.add(child) children.add(child)
try { try {
Future f = m.invoke(pool, new ScalaAsyncChild()) Future f = m(pool, new ScalaAsyncChild())
jobFutures.add(f) jobFutures.add(f)
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw e.getCause() throw e.getCause()
@ -142,8 +128,8 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
TEST_WRITER.size() == 1 TEST_WRITER.size() == 1
where: where:
poolImpl | method name | method | poolImpl
new ForkJoinPool() | submitRunnableMethod "submit Runnable" | submitRunnable | new ForkJoinPool()
new ForkJoinPool() | submitCallableMethod "submit Callable" | submitCallable | new ForkJoinPool()
} }
} }

View File

@ -2,6 +2,8 @@ package datadog.trace.instrumentation.java.concurrent;
import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.WeakMap; import datadog.trace.bootstrap.WeakMap;
import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope; import datadog.trace.context.TraceScope;
import io.opentracing.Scope; import io.opentracing.Scope;
@ -13,7 +15,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class ExecutorInstrumentationUtils { public class ExecutorInstrumentationUtils {
private static final WeakMap<Executor, Boolean> DISABLED_EXECUTORS = private static final WeakMap<Executor, Boolean> EXECUTORS_DISABLED_FOR_WRAPPED_TASKS =
WeakMap.Provider.newWeakMap(); WeakMap.Provider.newWeakMap();
/** /**
@ -28,7 +30,7 @@ public class ExecutorInstrumentationUtils {
return (scope instanceof TraceScope return (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& task != null && task != null
&& !ExecutorInstrumentationUtils.isDisabled(executor)); && !ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, task));
} }
/** /**
@ -74,12 +76,19 @@ public class ExecutorInstrumentationUtils {
} }
} }
public static void disableExecutor(final Executor executor) { public static void disableExecutorForWrappedTasks(final Executor executor) {
log.debug("Disabling Executor tracing for instance {}", executor); log.debug("Disabling Executor tracing for wrapped tasks for instance {}", executor);
DISABLED_EXECUTORS.put(executor, true); EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.put(executor, true);
} }
public static boolean isDisabled(final Executor executor) { /**
return DISABLED_EXECUTORS.containsKey(executor); * Check if Executor can accept given task.
*
* <p>Disabled executors cannot accept wrapped tasks, non wrapped tasks (i.e. tasks with injected
* fields) should still work fine.
*/
public static boolean isExecutorDisabledForThisTask(final Executor executor, final Object task) {
return (task instanceof RunnableWrapper || task instanceof CallableWrapper)
&& EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.containsKey(executor);
} }
} }

View File

@ -61,7 +61,8 @@ public final class FutureInstrumentation extends Instrumenter.Default {
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask", "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask",
"com.google.common.util.concurrent.SettableFuture", "com.google.common.util.concurrent.SettableFuture",
"com.google.common.util.concurrent.AbstractFuture$TrustedFuture", "com.google.common.util.concurrent.AbstractFuture$TrustedFuture",
"com.google.common.util.concurrent.AbstractFuture" "com.google.common.util.concurrent.AbstractFuture",
"io.netty.util.concurrent.ScheduledFutureTask"
}; };
WHITELISTED_FUTURES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); WHITELISTED_FUTURES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
} }
@ -102,15 +103,15 @@ public final class FutureInstrumentation extends Instrumenter.Default {
public static class CanceledFutureAdvice { public static class CanceledFutureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void exit( public static void exit(@Advice.This final Future<?> future) {
@Advice.This final Future<?> future, @Advice.Return final boolean canceled) { // Try to close continuation even if future was not cancelled:
if (canceled) { // the expectation is that continuation should be closed after 'cancel'
final ContextStore<Future, State> contextStore = // is called, one way or another
InstrumentationContext.get(Future.class, State.class); final ContextStore<Future, State> contextStore =
final State state = contextStore.get(future); InstrumentationContext.get(Future.class, State.class);
if (state != null) { final State state = contextStore.get(future);
state.closeContinuation(); if (state != null) {
} state.closeContinuation();
} }
} }
} }

View File

@ -61,11 +61,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
named("submit").and(takesArgument(0, ForkJoinTask.class)), named("submit").and(takesArgument(0, ForkJoinTask.class)),
SetJavaForkJoinStateAdvice.class.getName()); SetJavaForkJoinStateAdvice.class.getName());
transformers.put( transformers.put(
nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)), nameMatches("invoke(Any|All)$").and(takesArgument(0, Collection.class)),
SetCallableStateForCallableCollectionAdvice.class.getName()); SetCallableStateForCallableCollectionAdvice.class.getName());
transformers.put( transformers.put(
nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)), nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)),
SetJavaForkJoinStateAdvice.class.getName()); SetJavaForkJoinStateAdvice.class.getName());
transformers.put(
named("schedule").and(takesArgument(0, Runnable.class)),
SetSubmitRunnableStateAdvice.class.getName());
transformers.put(
named("schedule").and(takesArgument(0, Callable.class)),
SetCallableStateAdvice.class.getName());
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
named("dispatch") named("dispatch")
.and(takesArgument(0, Runnable.class)) .and(takesArgument(0, Runnable.class))
@ -81,11 +87,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
@Advice.This final Executor executor, @Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) { @Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { final Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
task = RunnableWrapper.wrapIfNeeded(task); // It is important to check potentially wrapped task if we can instrument task in this
// executor. Some executors do not support wrapped tasks.
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) {
task = newTask;
final ContextStore<Runnable, State> contextStore = final ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class); InstrumentationContext.get(Runnable.class, State.class);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
} }
return null; return null;
} }
@ -130,11 +139,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
@Advice.This final Executor executor, @Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) { @Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { final Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
task = RunnableWrapper.wrapIfNeeded(task); // It is important to check potentially wrapped task if we can instrument task in this
// executor. Some executors do not support wrapped tasks.
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) {
task = newTask;
final ContextStore<Runnable, State> contextStore = final ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class); InstrumentationContext.get(Runnable.class, State.class);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
} }
return null; return null;
} }
@ -161,11 +173,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
@Advice.This final Executor executor, @Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Callable task) { @Advice.Argument(value = 0, readOnly = false) Callable task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { final Callable newTask = CallableWrapper.wrapIfNeeded(task);
task = CallableWrapper.wrapIfNeeded(task); // It is important to check potentially wrapped task if we can instrument task in this
// executor. Some executors do not support wrapped tasks.
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) {
task = newTask;
final ContextStore<Callable, State> contextStore = final ContextStore<Callable, State> contextStore =
InstrumentationContext.get(Callable.class, State.class); InstrumentationContext.get(Callable.class, State.class);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
} }
return null; return null;
} }
@ -196,13 +211,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& tasks != null) { && tasks != null) {
final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size()); final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable<?> task : tasks) { for (final Callable<?> task : tasks) {
if (task != null) { if (task != null) {
task = CallableWrapper.wrapIfNeeded(task); final Callable newTask = CallableWrapper.wrapIfNeeded(task);
wrappedTasks.add(task); if (ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, newTask)) {
final ContextStore<Callable, State> contextStore = wrappedTasks.add(task);
InstrumentationContext.get(Callable.class, State.class); } else {
ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); wrappedTasks.add(newTask);
final ContextStore<Callable, State> contextStore =
InstrumentationContext.get(Callable.class, State.class);
ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
}
} }
} }
tasks = wrappedTasks; tasks = wrappedTasks;

View File

@ -66,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
} catch (final ClassCastException | IllegalArgumentException e) { } catch (final ClassCastException | IllegalArgumentException e) {
// These errors indicate the queue is fundamentally incompatible with wrapped runnables. // These errors indicate the queue is fundamentally incompatible with wrapped runnables.
// We must disable the executor instance to avoid passing wrapped runnables later. // We must disable the executor instance to avoid passing wrapped runnables later.
ExecutorInstrumentationUtils.disableExecutor(executor); ExecutorInstrumentationUtils.disableExecutorForWrappedTasks(executor);
} catch (final Exception e) { } catch (final Exception e) {
// Other errors might indicate the queue is not fully initialized. // Other errors might indicate the queue is not fully initialized.
// We might want to disable for those too, but for now just ignore. // We might want to disable for those too, but for now just ignore.

View File

@ -2,48 +2,50 @@ import datadog.opentracing.DDSpan
import datadog.opentracing.scopemanager.ContinuableScope import datadog.opentracing.scopemanager.ContinuableScope
import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace import datadog.trace.api.Trace
import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper
import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper
import io.opentracing.util.GlobalTracer import io.opentracing.util.GlobalTracer
import spock.lang.Shared import spock.lang.Shared
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable import java.util.concurrent.Callable
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.ForkJoinPool import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask import java.util.concurrent.ForkJoinTask
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
class ExecutorInstrumentationTest extends AgentTestRunner { class ExecutorInstrumentationTest extends AgentTestRunner {
@Shared
Method executeRunnableMethod
@Shared
Method executeForkJoinTaskMethod
@Shared
Method submitRunnableMethod
@Shared
Method submitCallableMethod
@Shared
Method submitForkJoinTaskMethod
@Shared
Method invokeForkJoinTaskMethod
def setupSpec() { @Shared
executeRunnableMethod = Executor.getMethod("execute", Runnable) def executeRunnable = { e, c -> e.execute((Runnable) c) }
executeForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) @Shared
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) def executeForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
submitCallableMethod = ExecutorService.getMethod("submit", Callable) @Shared
submitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) def submitRunnable = { e, c -> e.submit((Runnable) c) }
invokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) @Shared
} def submitCallable = { e, c -> e.submit((Callable) c) }
@Shared
def submitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
@Shared
def invokeAll = { e, c -> e.invokeAll([(Callable) c]) }
@Shared
def invokeAllTimeout = { e, c -> e.invokeAll([(Callable) c], 10, TimeUnit.SECONDS) }
@Shared
def invokeAny = { e, c -> e.invokeAny([(Callable) c]) }
@Shared
def invokeAnyTimeout = { e, c -> e.invokeAny([(Callable) c], 10, TimeUnit.SECONDS) }
@Shared
def invokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
@Shared
def scheduleRunnable = { e, c -> e.schedule((Runnable) c, 10, TimeUnit.MILLISECONDS) }
@Shared
def scheduleCallable = { e, c -> e.schedule((Callable) c, 10, TimeUnit.MILLISECONDS) }
// more useful name breaks java9 javac def "#poolImpl '#name' propagates"() {
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl #method propagates"() {
setup: setup:
def pool = poolImpl def pool = poolImpl
def m = method def m = method
@ -54,9 +56,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
void run() { void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span // this child will have a span
m.invoke(pool, new JavaAsyncChild()) m(pool, new JavaAsyncChild())
// this child won't // this child won't
m.invoke(pool, new JavaAsyncChild(false, false)) m(pool, new JavaAsyncChild(false, false))
} }
}.run() }.run()
@ -75,23 +77,84 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
// Unfortunately, there's no simple way to test the cross product of methods/pools. // Unfortunately, there's no simple way to test the cross product of methods/pools.
where: where:
poolImpl | method name | method | poolImpl
new ForkJoinPool() | executeRunnableMethod "execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | executeForkJoinTaskMethod "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitRunnableMethod "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitCallableMethod "invokeAll" | invokeAll | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitForkJoinTaskMethod "invokeAll with timeout" | invokeAllTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | invokeForkJoinTaskMethod "invokeAny" | invokeAny | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
"invokeAny with timeout" | invokeAnyTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod // Scheduled executor has additional methods and also may get disabled because it wraps tasks
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod "execute Runnable" | executeRunnable | new ScheduledThreadPoolExecutor(1)
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod "submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1)
"submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1)
"invokeAll" | invokeAll | new ScheduledThreadPoolExecutor(1)
"invokeAll with timeout" | invokeAllTimeout | new ScheduledThreadPoolExecutor(1)
"invokeAny" | invokeAny | new ScheduledThreadPoolExecutor(1)
"invokeAny with timeout" | invokeAnyTimeout | new ScheduledThreadPoolExecutor(1)
"schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1)
"schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1)
// ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
"execute Runnable" | executeRunnable | new ForkJoinPool()
"execute ForkJoinTask" | executeForkJoinTask | new ForkJoinPool()
"submit Runnable" | submitRunnable | new ForkJoinPool()
"submit Callable" | submitCallable | new ForkJoinPool()
"submit ForkJoinTask" | submitForkJoinTask | new ForkJoinPool()
"invoke ForkJoinTask" | invokeForkJoinTask | new ForkJoinPool()
"invokeAll" | invokeAll | new ForkJoinPool()
"invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool()
"invokeAny" | invokeAny | new ForkJoinPool()
"invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool()
} }
// more useful name breaks java9 javac def "#poolImpl '#name' disabled wrapping"() {
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() setup:
def "#poolImpl reports after canceled jobs"() { def pool = poolImpl
def m = method
def w = wrap
JavaAsyncChild child = new JavaAsyncChild(true, true)
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
m(pool, w(child))
}
}.run()
// We block in child to make sure spans close in predictable order
child.unblock()
// Expect two traces because async propagation gets effectively disabled
TEST_WRITER.waitForTraces(2)
expect:
TEST_WRITER.size() == 2
TEST_WRITER.get(0).size() == 1
TEST_WRITER.get(0).get(0).operationName == "parent"
TEST_WRITER.get(1).size() == 1
TEST_WRITER.get(1).get(0).operationName == "asyncChild"
cleanup:
pool?.shutdown()
where:
// Scheduled executor cannot accept wrapped tasks
// TODO: we should have a test that passes lambda, but this is hard
// because this requires tests to be run in java8+ only.
// Instead we 'hand-wrap' tasks in this test.
name | method | wrap | poolImpl
"execute Runnable" | executeRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
"submit Runnable" | submitRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
"submit Callable" | submitCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
"schedule Runnable" | scheduleRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
"schedule Callable" | scheduleCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
}
def "#poolImpl '#name' reports after canceled jobs"() {
setup: setup:
def pool = poolImpl def pool = poolImpl
def m = method def m = method
@ -114,7 +177,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
final JavaAsyncChild child = new JavaAsyncChild(true, true) final JavaAsyncChild child = new JavaAsyncChild(true, true)
children.add(child) children.add(child)
try { try {
Future f = m.invoke(pool, new JavaAsyncChild()) Future f = m(pool, new JavaAsyncChild())
jobFutures.add(f) jobFutures.add(f)
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw e.getCause() throw e.getCause()
@ -139,11 +202,18 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
TEST_WRITER.size() == 1 TEST_WRITER.size() == 1
where: where:
poolImpl | method name | method | poolImpl
new ForkJoinPool() | submitRunnableMethod "submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ForkJoinPool() | submitCallableMethod "submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod // Scheduled executor has additional methods and also may get disabled because it wraps tasks
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod "submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1)
"submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1)
"schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1)
"schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1)
// ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
"submit Runnable" | submitRunnable | new ForkJoinPool()
"submit Callable" | submitCallable | new ForkJoinPool()
} }
} }

View File

@ -30,7 +30,6 @@ class TraceUtils {
throw e throw e
} finally { } finally {
((TraceScope) scope).setAsyncPropagation(false)
scope.close() scope.close()
} }
} }

View File

@ -159,7 +159,10 @@ public class ContinuableScope implements Scope, TraceScope {
if (closeContinuationScope) { if (closeContinuationScope) {
ContinuableScope.this.close(); ContinuableScope.this.close();
} else { } else {
openCount.decrementAndGet(); // Same as in 'close()' above.
if (openCount.decrementAndGet() == 0 && finishOnClose) {
spanUnderScope.finish();
}
} }
} else { } else {
log.debug("Failed to close continuation {}. Already used.", this); log.debug("Failed to close continuation {}. Already used.", this);

View File

@ -183,6 +183,78 @@ class ScopeManagerTest extends Specification {
false | true false | true
} }
def "Continuation.close closes parent scope"() {
setup:
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(true)
scope.setAsyncPropagation(true)
def continuation = scope.capture()
when:
/*
Note: this API is inherently broken. Our scope implementation doesn't allow us to close scopes
in random order, yet when we close continuation we attempt to close scope by default.
And in fact continuation trying to close parent scope is most likely a bug.
*/
continuation.close(true)
then:
scopeManager.active() == null
!spanFinished(scope.span())
when:
scope.span().finish()
then:
scopeManager.active() == null
}
def "Continuation.close doesn't close parent scope"() {
setup:
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(true)
scope.setAsyncPropagation(true)
def continuation = scope.capture()
when:
continuation.close(false)
then:
scopeManager.active() == scope
}
def "Continuation.close doesn't close parent scope, span finishes"() {
/*
This is highly confusing behaviour. Sequence of events is as following:
* Scope gets created along with span and with finishOnClose == true.
* Continuation gets created for that scope.
* Scope is closed.
At this point scope is not really closed. It is removed from scope
stack, but it is still alive because there is a live continuation attached
to it. This also means span is not closed.
* Continuation is closed.
This triggers final closing of scope and closing of the span.
This is confusing because expected behaviour is for span to be closed
with the scope when finishOnClose = true, but in fact span lingers until
continuation is closed.
*/
setup:
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(true)
scope.setAsyncPropagation(true)
def continuation = scope.capture()
scope.close()
when:
continuation.close(false)
then:
scopeManager.active() == null
spanFinished(scope.span())
writer == [[scope.span()]]
}
@Timeout(value = 60, unit = TimeUnit.SECONDS) @Timeout(value = 60, unit = TimeUnit.SECONDS)
def "hard reference on continuation prevents trace from reporting"() { def "hard reference on continuation prevents trace from reporting"() {
setup: setup: