Merge pull request #830 from DataDog/tyler/executor-config

Add config to enable individual executors
This commit is contained in:
Tyler Benson 2019-05-16 10:05:29 -07:00 committed by GitHub
commit 354d17ac7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 68 deletions

View File

@ -6,10 +6,12 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.description.type.TypeDescription;
@ -20,11 +22,15 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
public static final String EXEC_NAME = "java_concurrent";
private static final boolean TRACE_ALL_EXECUTORS =
Config.getBooleanSettingFromEnvironment("trace.executors.all", false);
/**
* Only apply executor instrumentation to whitelisted executors. In the future, this restriction
* may be lifted to include all executors.
* Only apply executor instrumentation to whitelisted executors. To apply to all executors, use
* override setting above.
*/
private static final Collection<String> WHITELISTED_EXECUTORS;
/**
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
@ -33,6 +39,11 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
static {
if (TRACE_ALL_EXECUTORS) {
log.info("Tracing all executors enabled.");
WHITELISTED_EXECUTORS = Collections.emptyList();
WHITELISTED_EXECUTORS_PREFIXES = Collections.emptyList();
} else {
final String[] whitelist = {
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.ThreadPoolExecutor",
@ -73,12 +84,18 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
"com.google.common.util.concurrent.MoreExecutors$ListeningDecorator",
"com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator",
};
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
final Set<String> executors =
new HashSet<>(Config.getListSettingFromEnvironment("trace.executors", ""));
executors.addAll(Arrays.asList(whitelist));
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(executors);
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
}
}
public AbstractExecutorInstrumentation(final String... additionalNames) {
super(EXEC_NAME, additionalNames);
@ -86,9 +103,12 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named(Executor.class.getName())))
.and(
final ElementMatcher.Junction<TypeDescription> matcher =
not(isInterface()).and(safeHasSuperType(named(Executor.class.getName())));
if (TRACE_ALL_EXECUTORS) {
return matcher;
}
return matcher.and(
new ElementMatcher<TypeDescription>() {
@Override
public boolean matches(final TypeDescription target) {

View File

@ -8,18 +8,26 @@ import io.opentracing.util.GlobalTracer
import spock.lang.Shared
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.AbstractExecutorService
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutionException
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class ExecutorInstrumentationTest extends AgentTestRunner {
static {
System.setProperty("dd.trace.executors", "ExecutorInstrumentationTest\$CustomThreadPoolExecutor")
}
@Shared
def executeRunnable = { e, c -> e.execute((Runnable) c) }
@Shared
@ -108,6 +116,15 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool()
"invokeAny" | invokeAny | new ForkJoinPool()
"invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool()
// CustomThreadPoolExecutor would normally be disabled except enabled above.
"execute Runnable" | executeRunnable | new CustomThreadPoolExecutor()
"submit Runnable" | submitRunnable | new CustomThreadPoolExecutor()
"submit Callable" | submitCallable | new CustomThreadPoolExecutor()
"invokeAll" | invokeAll | new CustomThreadPoolExecutor()
"invokeAll with timeout" | invokeAllTimeout | new CustomThreadPoolExecutor()
"invokeAny" | invokeAny | new CustomThreadPoolExecutor()
"invokeAny with timeout" | invokeAnyTimeout | new CustomThreadPoolExecutor()
}
def "#poolImpl '#name' disabled wrapping"() {
@ -216,4 +233,105 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"submit Runnable" | submitRunnable | new ForkJoinPool()
"submit Callable" | submitCallable | new ForkJoinPool()
}
static class CustomThreadPoolExecutor extends AbstractExecutorService {
volatile running = true
def workQueue = new LinkedBlockingQueue<Runnable>(10)
def worker = new Runnable() {
void run() {
try {
while (running) {
def runnable = workQueue.take()
runnable.run()
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt()
} catch (Exception e) {
e.printStackTrace()
}
}
}
def workerThread = new Thread(worker, "ExecutorTestThread")
private CustomThreadPoolExecutor() {
workerThread.start()
}
@Override
void shutdown() {
running = false
workerThread.interrupt()
}
@Override
List<Runnable> shutdownNow() {
running = false
workerThread.interrupt()
return []
}
@Override
boolean isShutdown() {
return !running
}
@Override
boolean isTerminated() {
return workerThread.isAlive()
}
@Override
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
workerThread.join(unit.toMillis(timeout))
return true
}
@Override
def <T> Future<T> submit(Callable<T> task) {
def future = newTaskFor(task)
execute(future)
return future
}
@Override
def <T> Future<T> submit(Runnable task, T result) {
def future = newTaskFor(task, result)
execute(future)
return future
}
@Override
Future<?> submit(Runnable task) {
def future = newTaskFor(task, null)
execute(future)
return future
}
@Override
def <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return super.invokeAll(tasks)
}
@Override
def <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return super.invokeAll(tasks)
}
@Override
def <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return super.invokeAny(tasks)
}
@Override
def <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return super.invokeAny(tasks)
}
@Override
void execute(Runnable command) {
workQueue.put(command)
}
}
}

View File

@ -399,7 +399,11 @@ public class Config {
return parseMap(getSettingFromEnvironment(name, defaultValue), PREFIX + name);
}
private static List<String> getListSettingFromEnvironment(
/**
* Calls {@link #getSettingFromEnvironment(String, String)} and converts the result to a list by
* splitting on `,`.
*/
public static List<String> getListSettingFromEnvironment(
final String name, final String defaultValue) {
return parseList(getSettingFromEnvironment(name, defaultValue));
}