Allow non-wrapped tasks in disabled executors
Some executors cannot handle tasks that have been wrapped into `{Runnable,Callable}Wrapper` because they require certain subclass of `{Callable,Runnable}` in order to work. We have a test that effectively disables instrumentation for such executors. This change makes sure that tasks that do not need to be wrapped (which essentially means anything that is not lambda) still get traced in such executors. One notable example of affected executor type is `ScheduledThreadPoolExecutor`.
This commit is contained in:
parent
b672a0fffc
commit
bae2d7dde8
|
@ -8,11 +8,8 @@ import io.opentracing.util.GlobalTracer
|
|||
import spock.lang.Shared
|
||||
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
|
@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit
|
|||
* This is to large extent a copy of ExecutorInstrumentationTest.
|
||||
*/
|
||||
class AkkaExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method executeRunnableMethod
|
||||
@Shared
|
||||
Method akkaExecuteForkJoinTaskMethod
|
||||
@Shared
|
||||
Method submitRunnableMethod
|
||||
@Shared
|
||||
Method submitCallableMethod
|
||||
@Shared
|
||||
Method akkaSubmitForkJoinTaskMethod
|
||||
@Shared
|
||||
Method akkaInvokeForkJoinTaskMethod
|
||||
|
||||
def setupSpec() {
|
||||
executeRunnableMethod = Executor.getMethod("execute", Runnable)
|
||||
akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
|
||||
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
|
||||
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
|
||||
akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
|
||||
akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
|
||||
}
|
||||
@Shared
|
||||
def executeRunnable = { e, c -> e.execute((Runnable) c) }
|
||||
@Shared
|
||||
def akkaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def submitRunnable = { e, c -> e.submit((Runnable) c) }
|
||||
@Shared
|
||||
def submitCallable = { e, c -> e.submit((Callable) c) }
|
||||
@Shared
|
||||
def akkaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def akkaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl #method propagates"() {
|
||||
def "#poolImpl '#name' propagates"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
@ -58,9 +45,9 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new AkkaAsyncChild())
|
||||
m(pool, new AkkaAsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new AkkaAsyncChild(false, false))
|
||||
m(pool, new AkkaAsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
|
@ -79,22 +66,21 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
|
||||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | executeRunnableMethod
|
||||
new ForkJoinPool() | akkaExecuteForkJoinTaskMethod
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | akkaSubmitForkJoinTaskMethod
|
||||
new ForkJoinPool() | akkaInvokeForkJoinTaskMethod
|
||||
name | method | poolImpl
|
||||
"execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
// ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
|
||||
"execute Runnable" | executeRunnable | new ForkJoinPool()
|
||||
"execute ForkJoinTask" | akkaExecuteForkJoinTask | new ForkJoinPool()
|
||||
"submit Runnable" | submitRunnable | new ForkJoinPool()
|
||||
"submit Callable" | submitCallable | new ForkJoinPool()
|
||||
"submit ForkJoinTask" | akkaSubmitForkJoinTask | new ForkJoinPool()
|
||||
"invoke ForkJoinTask" | akkaInvokeForkJoinTask | new ForkJoinPool()
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl reports after canceled jobs"() {
|
||||
def "#poolImpl '#name' reports after canceled jobs"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
@ -117,7 +103,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
final AkkaAsyncChild child = new AkkaAsyncChild(true, true)
|
||||
children.add(child)
|
||||
try {
|
||||
Future f = m.invoke(pool, new AkkaAsyncChild())
|
||||
Future f = m(pool, new AkkaAsyncChild())
|
||||
jobFutures.add(f)
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause()
|
||||
|
@ -142,8 +128,8 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
TEST_WRITER.size() == 1
|
||||
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
name | method | poolImpl
|
||||
"submit Runnable" | submitRunnable | new ForkJoinPool()
|
||||
"submit Callable" | submitCallable | new ForkJoinPool()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,11 +8,8 @@ import scala.concurrent.forkjoin.ForkJoinTask
|
|||
import spock.lang.Shared
|
||||
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
|
@ -23,31 +20,21 @@ import java.util.concurrent.TimeUnit
|
|||
* This is to large extent a copy of ExecutorInstrumentationTest.
|
||||
*/
|
||||
class ScalaExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method executeRunnableMethod
|
||||
@Shared
|
||||
Method scalaExecuteForkJoinTaskMethod
|
||||
@Shared
|
||||
Method submitRunnableMethod
|
||||
@Shared
|
||||
Method submitCallableMethod
|
||||
@Shared
|
||||
Method scalaSubmitForkJoinTaskMethod
|
||||
@Shared
|
||||
Method scalaInvokeForkJoinTaskMethod
|
||||
|
||||
def setupSpec() {
|
||||
executeRunnableMethod = Executor.getMethod("execute", Runnable)
|
||||
scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
|
||||
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
|
||||
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
|
||||
scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
|
||||
scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
|
||||
}
|
||||
@Shared
|
||||
def executeRunnable = { e, c -> e.execute((Runnable) c) }
|
||||
@Shared
|
||||
def scalaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def submitRunnable = { e, c -> e.submit((Runnable) c) }
|
||||
@Shared
|
||||
def submitCallable = { e, c -> e.submit((Callable) c) }
|
||||
@Shared
|
||||
def scalaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def scalaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl #method propagates"() {
|
||||
def "#poolImpl '#name' propagates"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
@ -58,9 +45,9 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new ScalaAsyncChild())
|
||||
m(pool, new ScalaAsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new ScalaAsyncChild(false, false))
|
||||
m(pool, new ScalaAsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
|
@ -79,22 +66,21 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
|
||||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | executeRunnableMethod
|
||||
new ForkJoinPool() | scalaExecuteForkJoinTaskMethod
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | scalaSubmitForkJoinTaskMethod
|
||||
new ForkJoinPool() | scalaInvokeForkJoinTaskMethod
|
||||
name | method | poolImpl
|
||||
"execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
// ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
|
||||
"execute Runnable" | executeRunnable | new ForkJoinPool()
|
||||
"execute ForkJoinTask" | scalaExecuteForkJoinTask | new ForkJoinPool()
|
||||
"submit Runnable" | submitRunnable | new ForkJoinPool()
|
||||
"submit Callable" | submitCallable | new ForkJoinPool()
|
||||
"submit ForkJoinTask" | scalaSubmitForkJoinTask | new ForkJoinPool()
|
||||
"invoke ForkJoinTask" | scalaInvokeForkJoinTask | new ForkJoinPool()
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl reports after canceled jobs"() {
|
||||
def "#poolImpl '#name' reports after canceled jobs"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
@ -117,7 +103,7 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
final ScalaAsyncChild child = new ScalaAsyncChild(true, true)
|
||||
children.add(child)
|
||||
try {
|
||||
Future f = m.invoke(pool, new ScalaAsyncChild())
|
||||
Future f = m(pool, new ScalaAsyncChild())
|
||||
jobFutures.add(f)
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause()
|
||||
|
@ -142,8 +128,8 @@ class ScalaExecutorInstrumentationTest extends AgentTestRunner {
|
|||
TEST_WRITER.size() == 1
|
||||
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
name | method | poolImpl
|
||||
"submit Runnable" | submitRunnable | new ForkJoinPool()
|
||||
"submit Callable" | submitCallable | new ForkJoinPool()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package datadog.trace.instrumentation.java.concurrent;
|
|||
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.WeakMap;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
|
@ -13,7 +15,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
@Slf4j
|
||||
public class ExecutorInstrumentationUtils {
|
||||
|
||||
private static final WeakMap<Executor, Boolean> DISABLED_EXECUTORS =
|
||||
private static final WeakMap<Executor, Boolean> EXECUTORS_DISABLED_FOR_WRAPPED_TASKS =
|
||||
WeakMap.Provider.newWeakMap();
|
||||
|
||||
/**
|
||||
|
@ -28,7 +30,7 @@ public class ExecutorInstrumentationUtils {
|
|||
return (scope instanceof TraceScope
|
||||
&& ((TraceScope) scope).isAsyncPropagating()
|
||||
&& task != null
|
||||
&& !ExecutorInstrumentationUtils.isDisabled(executor));
|
||||
&& !ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, task));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,12 +76,19 @@ public class ExecutorInstrumentationUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void disableExecutor(final Executor executor) {
|
||||
log.debug("Disabling Executor tracing for instance {}", executor);
|
||||
DISABLED_EXECUTORS.put(executor, true);
|
||||
public static void disableExecutorForWrappedTasks(final Executor executor) {
|
||||
log.debug("Disabling Executor tracing for wrapped tasks for instance {}", executor);
|
||||
EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.put(executor, true);
|
||||
}
|
||||
|
||||
public static boolean isDisabled(final Executor executor) {
|
||||
return DISABLED_EXECUTORS.containsKey(executor);
|
||||
/**
|
||||
* Check if Executor can accept given task.
|
||||
*
|
||||
* <p>Disabled executors cannot accept wrapped tasks, non wrapped tasks (i.e. tasks with injected
|
||||
* fields) should still work fine.
|
||||
*/
|
||||
public static boolean isExecutorDisabledForThisTask(final Executor executor, final Object task) {
|
||||
return (task instanceof RunnableWrapper || task instanceof CallableWrapper)
|
||||
&& EXECUTORS_DISABLED_FOR_WRAPPED_TASKS.containsKey(executor);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,7 +61,8 @@ public final class FutureInstrumentation extends Instrumenter.Default {
|
|||
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask",
|
||||
"com.google.common.util.concurrent.SettableFuture",
|
||||
"com.google.common.util.concurrent.AbstractFuture$TrustedFuture",
|
||||
"com.google.common.util.concurrent.AbstractFuture"
|
||||
"com.google.common.util.concurrent.AbstractFuture",
|
||||
"io.netty.util.concurrent.ScheduledFutureTask"
|
||||
};
|
||||
WHITELISTED_FUTURES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
|
||||
}
|
||||
|
|
|
@ -61,11 +61,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
|
|||
named("submit").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetJavaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)),
|
||||
nameMatches("invoke(Any|All)$").and(takesArgument(0, Collection.class)),
|
||||
SetCallableStateForCallableCollectionAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetJavaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("schedule").and(takesArgument(0, Runnable.class)),
|
||||
SetSubmitRunnableStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("schedule").and(takesArgument(0, Callable.class)),
|
||||
SetCallableStateAdvice.class.getName());
|
||||
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
|
||||
named("dispatch")
|
||||
.and(takesArgument(0, Runnable.class))
|
||||
|
@ -81,11 +87,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
task = RunnableWrapper.wrapIfNeeded(task);
|
||||
final Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
|
||||
// It is important to check potentially wrapped task if we can instrument task in this
|
||||
// executor. Some executors do not support wrapped tasks.
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) {
|
||||
task = newTask;
|
||||
final ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -130,11 +139,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
task = RunnableWrapper.wrapIfNeeded(task);
|
||||
final Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
|
||||
// It is important to check potentially wrapped task if we can instrument task in this
|
||||
// executor. Some executors do not support wrapped tasks.
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) {
|
||||
task = newTask;
|
||||
final ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -161,11 +173,14 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) Callable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
task = CallableWrapper.wrapIfNeeded(task);
|
||||
final Callable newTask = CallableWrapper.wrapIfNeeded(task);
|
||||
// It is important to check potentially wrapped task if we can instrument task in this
|
||||
// executor. Some executors do not support wrapped tasks.
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask, executor)) {
|
||||
task = newTask;
|
||||
final ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -196,13 +211,17 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
|
|||
&& ((TraceScope) scope).isAsyncPropagating()
|
||||
&& tasks != null) {
|
||||
final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
|
||||
for (Callable<?> task : tasks) {
|
||||
for (final Callable<?> task : tasks) {
|
||||
if (task != null) {
|
||||
task = CallableWrapper.wrapIfNeeded(task);
|
||||
wrappedTasks.add(task);
|
||||
final ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
final Callable newTask = CallableWrapper.wrapIfNeeded(task);
|
||||
if (ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, newTask)) {
|
||||
wrappedTasks.add(task);
|
||||
} else {
|
||||
wrappedTasks.add(newTask);
|
||||
final ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
ExecutorInstrumentationUtils.setupState(contextStore, newTask, (TraceScope) scope);
|
||||
}
|
||||
}
|
||||
}
|
||||
tasks = wrappedTasks;
|
||||
|
|
|
@ -66,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
|
|||
} catch (final ClassCastException | IllegalArgumentException e) {
|
||||
// These errors indicate the queue is fundamentally incompatible with wrapped runnables.
|
||||
// We must disable the executor instance to avoid passing wrapped runnables later.
|
||||
ExecutorInstrumentationUtils.disableExecutor(executor);
|
||||
ExecutorInstrumentationUtils.disableExecutorForWrappedTasks(executor);
|
||||
} catch (final Exception e) {
|
||||
// Other errors might indicate the queue is not fully initialized.
|
||||
// We might want to disable for those too, but for now just ignore.
|
||||
|
|
|
@ -2,48 +2,50 @@ import datadog.opentracing.DDSpan
|
|||
import datadog.opentracing.scopemanager.ContinuableScope
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.Trace
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper
|
||||
import io.opentracing.util.GlobalTracer
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.ForkJoinPool
|
||||
import java.util.concurrent.ForkJoinTask
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class ExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method executeRunnableMethod
|
||||
@Shared
|
||||
Method executeForkJoinTaskMethod
|
||||
@Shared
|
||||
Method submitRunnableMethod
|
||||
@Shared
|
||||
Method submitCallableMethod
|
||||
@Shared
|
||||
Method submitForkJoinTaskMethod
|
||||
@Shared
|
||||
Method invokeForkJoinTaskMethod
|
||||
|
||||
def setupSpec() {
|
||||
executeRunnableMethod = Executor.getMethod("execute", Runnable)
|
||||
executeForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
|
||||
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
|
||||
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
|
||||
submitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
|
||||
invokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
|
||||
}
|
||||
@Shared
|
||||
def executeRunnable = { e, c -> e.execute((Runnable) c) }
|
||||
@Shared
|
||||
def executeForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def submitRunnable = { e, c -> e.submit((Runnable) c) }
|
||||
@Shared
|
||||
def submitCallable = { e, c -> e.submit((Callable) c) }
|
||||
@Shared
|
||||
def submitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def invokeAll = { e, c -> e.invokeAll([(Callable) c]) }
|
||||
@Shared
|
||||
def invokeAllTimeout = { e, c -> e.invokeAll([(Callable) c], 10, TimeUnit.SECONDS) }
|
||||
@Shared
|
||||
def invokeAny = { e, c -> e.invokeAny([(Callable) c]) }
|
||||
@Shared
|
||||
def invokeAnyTimeout = { e, c -> e.invokeAny([(Callable) c], 10, TimeUnit.SECONDS) }
|
||||
@Shared
|
||||
def invokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
|
||||
@Shared
|
||||
def scheduleRunnable = { e, c -> e.schedule((Runnable) c, 10, TimeUnit.MILLISECONDS) }
|
||||
@Shared
|
||||
def scheduleCallable = { e, c -> e.schedule((Callable) c, 10, TimeUnit.MILLISECONDS) }
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl #method propagates"() {
|
||||
def "#poolImpl '#name' propagates"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
@ -54,9 +56,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new JavaAsyncChild())
|
||||
m(pool, new JavaAsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new JavaAsyncChild(false, false))
|
||||
m(pool, new JavaAsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
|
@ -75,23 +77,84 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
|
||||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | executeRunnableMethod
|
||||
new ForkJoinPool() | executeForkJoinTaskMethod
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | submitForkJoinTaskMethod
|
||||
new ForkJoinPool() | invokeForkJoinTaskMethod
|
||||
name | method | poolImpl
|
||||
"execute Runnable" | executeRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"invokeAll" | invokeAll | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"invokeAll with timeout" | invokeAllTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"invokeAny" | invokeAny | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"invokeAny with timeout" | invokeAnyTimeout | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
// Scheduled executor has additional methods and also may get disabled because it wraps tasks
|
||||
"execute Runnable" | executeRunnable | new ScheduledThreadPoolExecutor(1)
|
||||
"submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1)
|
||||
"submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1)
|
||||
"invokeAll" | invokeAll | new ScheduledThreadPoolExecutor(1)
|
||||
"invokeAll with timeout" | invokeAllTimeout | new ScheduledThreadPoolExecutor(1)
|
||||
"invokeAny" | invokeAny | new ScheduledThreadPoolExecutor(1)
|
||||
"invokeAny with timeout" | invokeAnyTimeout | new ScheduledThreadPoolExecutor(1)
|
||||
"schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1)
|
||||
"schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1)
|
||||
|
||||
// ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
|
||||
"execute Runnable" | executeRunnable | new ForkJoinPool()
|
||||
"execute ForkJoinTask" | executeForkJoinTask | new ForkJoinPool()
|
||||
"submit Runnable" | submitRunnable | new ForkJoinPool()
|
||||
"submit Callable" | submitCallable | new ForkJoinPool()
|
||||
"submit ForkJoinTask" | submitForkJoinTask | new ForkJoinPool()
|
||||
"invoke ForkJoinTask" | invokeForkJoinTask | new ForkJoinPool()
|
||||
"invokeAll" | invokeAll | new ForkJoinPool()
|
||||
"invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool()
|
||||
"invokeAny" | invokeAny | new ForkJoinPool()
|
||||
"invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool()
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl reports after canceled jobs"() {
|
||||
def "#poolImpl '#name' disabled wrapping"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
def w = wrap
|
||||
|
||||
JavaAsyncChild child = new JavaAsyncChild(true, true)
|
||||
new Runnable() {
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
m(pool, w(child))
|
||||
}
|
||||
}.run()
|
||||
// We block in child to make sure spans close in predictable order
|
||||
child.unblock()
|
||||
|
||||
// Expect two traces because async propagation gets effectively disabled
|
||||
TEST_WRITER.waitForTraces(2)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 2
|
||||
TEST_WRITER.get(0).size() == 1
|
||||
TEST_WRITER.get(0).get(0).operationName == "parent"
|
||||
TEST_WRITER.get(1).size() == 1
|
||||
TEST_WRITER.get(1).get(0).operationName == "asyncChild"
|
||||
|
||||
cleanup:
|
||||
pool?.shutdown()
|
||||
|
||||
where:
|
||||
// Scheduled executor cannot accept wrapped tasks
|
||||
// TODO: we should have a test that passes lambda, but this is hard
|
||||
// because this requires tests to be run in java8+ only.
|
||||
// Instead we 'hand-wrap' tasks in this test.
|
||||
name | method | wrap | poolImpl
|
||||
"execute Runnable" | executeRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
|
||||
"submit Runnable" | submitRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
|
||||
"submit Callable" | submitCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
|
||||
"schedule Runnable" | scheduleRunnable | { new RunnableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
|
||||
"schedule Callable" | scheduleCallable | { new CallableWrapper(it) } | new ScheduledThreadPoolExecutor(1)
|
||||
}
|
||||
|
||||
def "#poolImpl '#name' reports after canceled jobs"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
@ -114,7 +177,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
final JavaAsyncChild child = new JavaAsyncChild(true, true)
|
||||
children.add(child)
|
||||
try {
|
||||
Future f = m.invoke(pool, new JavaAsyncChild())
|
||||
Future f = m(pool, new JavaAsyncChild())
|
||||
jobFutures.add(f)
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause()
|
||||
|
@ -139,11 +202,18 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
TEST_WRITER.size() == 1
|
||||
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
name | method | poolImpl
|
||||
"submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
"submit Callable" | submitCallable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
// Scheduled executor has additional methods and also may get disabled because it wraps tasks
|
||||
"submit Runnable" | submitRunnable | new ScheduledThreadPoolExecutor(1)
|
||||
"submit Callable" | submitCallable | new ScheduledThreadPoolExecutor(1)
|
||||
"schedule Runnable" | scheduleRunnable | new ScheduledThreadPoolExecutor(1)
|
||||
"schedule Callable" | scheduleCallable | new ScheduledThreadPoolExecutor(1)
|
||||
|
||||
// ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
|
||||
"submit Runnable" | submitRunnable | new ForkJoinPool()
|
||||
"submit Callable" | submitCallable | new ForkJoinPool()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ class TraceUtils {
|
|||
|
||||
throw e
|
||||
} finally {
|
||||
((TraceScope) scope).setAsyncPropagation(false)
|
||||
scope.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ public class ContinuableScope implements Scope, TraceScope {
|
|||
if (closeContinuationScope) {
|
||||
ContinuableScope.this.close();
|
||||
} else {
|
||||
// Same in in 'close()' above.
|
||||
// Same as in 'close()' above.
|
||||
if (openCount.decrementAndGet() == 0 && finishOnClose) {
|
||||
spanUnderScope.finish();
|
||||
}
|
||||
|
|
|
@ -183,6 +183,78 @@ class ScopeManagerTest extends Specification {
|
|||
false | true
|
||||
}
|
||||
|
||||
def "Continuation.close closes parent scope"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(true)
|
||||
scope.setAsyncPropagation(true)
|
||||
def continuation = scope.capture()
|
||||
|
||||
when:
|
||||
/*
|
||||
Note: this API is inherently broken. Our scope implementation doesn't allow us to close scopes
|
||||
in random order, yet when we close continuation we attempt to close scope by default.
|
||||
And in fact continuation trying to close parent scope is most likely a bug.
|
||||
*/
|
||||
continuation.close(true)
|
||||
|
||||
then:
|
||||
scopeManager.active() == null
|
||||
!spanFinished(scope.span())
|
||||
|
||||
when:
|
||||
scope.span().finish()
|
||||
|
||||
then:
|
||||
scopeManager.active() == null
|
||||
}
|
||||
|
||||
def "Continuation.close doesn't close parent scope"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(true)
|
||||
scope.setAsyncPropagation(true)
|
||||
def continuation = scope.capture()
|
||||
|
||||
when:
|
||||
continuation.close(false)
|
||||
|
||||
then:
|
||||
scopeManager.active() == scope
|
||||
}
|
||||
|
||||
def "Continuation.close doesn't close parent scope, span finishes"() {
|
||||
/*
|
||||
This is highly confusing behaviour. Sequence of events is as following:
|
||||
* Scope gets created along with span and with finishOnClose == true.
|
||||
* Continuation gets created for that scope.
|
||||
* Scope is closed.
|
||||
At this point scope is not really closed. It is removed from scope
|
||||
stack, but it is still alive because there is a live continuation attached
|
||||
to it. This also means span is not closed.
|
||||
* Continuation is closed.
|
||||
This triggers final closing of scope and closing of the span.
|
||||
|
||||
This is confusing because expected behaviour is for span to be closed
|
||||
with the scope when finishOnClose = true, but in fact span lingers until
|
||||
continuation is closed.
|
||||
*/
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(true)
|
||||
scope.setAsyncPropagation(true)
|
||||
def continuation = scope.capture()
|
||||
scope.close()
|
||||
|
||||
when:
|
||||
continuation.close(false)
|
||||
|
||||
then:
|
||||
scopeManager.active() == null
|
||||
spanFinished(scope.span())
|
||||
writer == [[scope.span()]]
|
||||
}
|
||||
|
||||
@Timeout(value = 60, unit = TimeUnit.SECONDS)
|
||||
def "hard reference on continuation prevents trace from reporting"() {
|
||||
setup:
|
||||
|
|
Loading…
Reference in New Issue