Merge pull request #510 from DataDog/tyler/TPE-workqueue-safety

Disable Executor instrumentation ThreadPoolExecutor instances
This commit is contained in:
Tyler Benson 2018-09-21 14:49:27 +10:00 committed by GitHub
commit e58c1ea44e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 97 additions and 18 deletions

View File

@ -43,7 +43,7 @@ public interface Instrumenter {
/** @return Class names of helpers to inject into the user's classloader */
String[] helperClassNames();
Map<ElementMatcher, String> transformers();
Map<? extends ElementMatcher, String> transformers();
@Slf4j
abstract class Default implements Instrumenter {
@ -110,7 +110,7 @@ public interface Instrumenter {
private AgentBuilder.Identified.Extendable applyInstrumentationTransformers(
AgentBuilder.Identified.Extendable agentBuilder) {
for (final Map.Entry<ElementMatcher, String> entry : transformers().entrySet()) {
for (final Map.Entry<? extends ElementMatcher, String> entry : transformers().entrySet()) {
agentBuilder =
agentBuilder.transform(
new AgentBuilder.Transformer.ForAdvice()
@ -185,7 +185,7 @@ public interface Instrumenter {
public abstract ElementMatcher<? super TypeDescription> typeMatcher();
@Override
public abstract Map<ElementMatcher, String> transformers();
public abstract Map<? extends ElementMatcher, String> transformers();
protected boolean defaultEnabled() {
return getConfigEnabled("dd.integrations.enabled", true);

View File

@ -10,6 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.WeakMap;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
@ -160,9 +161,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static DatadogWrapper enterJobSubmit(
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (DatadogWrapper.shouldWrapTask(task)) {
if (DatadogWrapper.shouldWrapTask(task, executor)) {
task = new RunnableWrapper(task, (TraceScope) scope);
return (RunnableWrapper) task;
}
@ -180,10 +182,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static DatadogWrapper enterJobSubmit(
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Callable<?> task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (DatadogWrapper.shouldWrapTask(task)) {
if (DatadogWrapper.shouldWrapTask(task, executor)) {
task = new CallableWrapper<>(task, (TraceScope) scope);
return (CallableWrapper) task;
}
@ -274,13 +277,14 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
* @param task task object
* @return true iff given task object should be wrapped
*/
public static boolean shouldWrapTask(final Object task) {
public static boolean shouldWrapTask(final Object task, final Executor executor) {
final Scope scope = GlobalTracer.get().scopeManager().active();
return (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)
&& isTopLevelCall());
&& isTopLevelCall()
&& !ConcurrentUtils.isDisabled(executor));
}
/**
@ -343,10 +347,22 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
}
/** Utils for pulling DatadogWrapper out of Future instances. */
@Slf4j
public static class ConcurrentUtils {
private static final WeakMap<Executor, Boolean> disabledExecutors =
WeakMap.Provider.newWeakMap();
private static final Map<Class<?>, Field> fieldCache = new ConcurrentHashMap<>();
private static final String[] wrapperFields = {"runnable", "callable"};
public static void disableExecutor(final Executor executor) {
log.debug("Disabling Executor tracing for instance {}", executor);
disabledExecutors.put(executor, true);
}
public static boolean isDisabled(final Executor executor) {
return disabledExecutors.containsKey(executor);
}
public static DatadogWrapper getDatadogWrapper(final Future<?> f) {
final Field field;
if (fieldCache.containsKey(f.getClass())) {

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.java.concurrent;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@Slf4j
@AutoService(Instrumenter.class)
public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
public ThreadPoolExecutorInstrumentation() {
super(ExecutorInstrumentation.EXEC_NAME);
}
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("java.util.concurrent.ThreadPoolExecutor");
}
@Override
public String[] helperClassNames() {
return new String[] {
ExecutorInstrumentation.class.getName() + "$ConcurrentUtils",
ThreadPoolExecutorInstrumentation.class.getName() + "$GenericRunnable",
};
}
@Override
public Map<? extends ElementMatcher, String> transformers() {
return Collections.singletonMap(
isConstructor()
.and(takesArgument(4, named("java.util.concurrent.BlockingQueue")))
.and(takesArguments(7)),
ThreadPoolExecutorAdvice.class.getName());
}
public static class ThreadPoolExecutorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void disableIfQueueWrongType(
@Advice.This final ThreadPoolExecutor executor,
@Advice.Argument(4) final BlockingQueue queue) {
if (queue.size() == 0) {
try {
queue.add(new GenericRunnable());
queue.clear(); // Remove the Runnable we just added.
} catch (final ClassCastException e) {
ExecutorInstrumentation.ConcurrentUtils.disableExecutor(executor);
}
}
}
}
public static class GenericRunnable implements Runnable {
@Override
public void run() {}
}
}

View File

@ -2,13 +2,13 @@ import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import spock.lang.Shared
import static datadog.trace.agent.test.asserts.ListWriterAssert.assertTraces
class SlickTest extends AgentTestRunner {
@Shared
// Can't be @Shared, otherwise the work queue is initialized before the instrumentation is applied
// @Shared
def database = new SlickUtils()
def "Basic statement generates spans"() {

View File

@ -18,7 +18,7 @@ class SlickUtils {
// wrapped runnables.
executor = AsyncExecutor("test", numThreads = 1, queueSize = 1000)
)
Await.result(database.run(sqlu"""CREATE ALIAS SLEEP FOR "java.lang.Thread.sleep(long)""""), Duration.Inf)
Await.result(database.run(sqlu"""CREATE ALIAS IF NOT EXISTS SLEEP FOR "java.lang.Thread.sleep(long)""""), Duration.Inf)
@Trace
def startQuery(query: String): Future[Vector[Int]] = {

View File

@ -13,7 +13,6 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
@ -23,10 +22,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Shared
Method executeMethod
static {
System.setProperty("dd.integration.java_concurrent.enabled", "true")
}
def setupSpec() {
executeMethod = Executor.getMethod("execute", Runnable)
submitMethod = ExecutorService.getMethod("submit", Callable)
@ -71,8 +66,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
new ForkJoinPool() | executeMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeMethod
new ScheduledThreadPoolExecutor(1) | submitMethod
new ScheduledThreadPoolExecutor(1) | executeMethod
}
// more useful name breaks java9 javac
@ -112,6 +105,5 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
poolImpl | _
new ForkJoinPool() | _
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | _
new ScheduledThreadPoolExecutor(1) | _
}
}