Add config to enable individual executors

Or all executors, bypassing the allow list.

`dd.trace.executor=com.MyCustomExecutor,com.OtherExecutor`
`dd.trace.executors.all=true`

Turns out in many cases, executors that we say we’re skipping, are still being traced because they extend from an already instrumented executor.
This commit is contained in:
Tyler Benson 2019-05-13 17:33:40 -07:00
parent ad2663d840
commit cc23fee614
3 changed files with 210 additions and 68 deletions

View File

@ -6,10 +6,12 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.description.type.TypeDescription;
@ -20,11 +22,15 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
public static final String EXEC_NAME = "java_concurrent";
private static final boolean TRACE_ALL_EXECUTORS =
Config.getBooleanSettingFromEnvironment("trace.executors.all", false);
/**
* Only apply executor instrumentation to whitelisted executors. In the future, this restriction
* may be lifted to include all executors.
* Only apply executor instrumentation to whitelisted executors. To apply to all executors, use
* override setting above.
*/
private static final Collection<String> WHITELISTED_EXECUTORS;
/**
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
@ -33,51 +39,62 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
static {
final String[] whitelist = {
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.ThreadPoolExecutor",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"java.util.concurrent.ForkJoinPool",
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.Executors$DelegatedExecutorService",
"javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
"scala.concurrent.forkjoin.ForkJoinPool",
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
"akka.dispatch.MessageDispatcher",
"akka.dispatch.Dispatcher",
"akka.dispatch.Dispatcher$LazyExecutorServiceDelegate",
"akka.actor.ActorSystemImpl$$anon$1",
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
"akka.dispatch.forkjoin.ForkJoinPool",
"akka.dispatch.BalancingDispatcher",
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"play.api.libs.streams.Execution$trampoline$",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.nio.NioEventLoopGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
"com.google.common.util.concurrent.AbstractListeningExecutorService",
"com.google.common.util.concurrent.MoreExecutors$ListeningDecorator",
"com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator",
};
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
if (TRACE_ALL_EXECUTORS) {
log.info("Tracing all executors enabled.");
WHITELISTED_EXECUTORS = Collections.emptyList();
WHITELISTED_EXECUTORS_PREFIXES = Collections.emptyList();
} else {
final String[] whitelist = {
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.ThreadPoolExecutor",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"java.util.concurrent.ForkJoinPool",
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.Executors$DelegatedExecutorService",
"javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
"scala.concurrent.forkjoin.ForkJoinPool",
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
"akka.dispatch.MessageDispatcher",
"akka.dispatch.Dispatcher",
"akka.dispatch.Dispatcher$LazyExecutorServiceDelegate",
"akka.actor.ActorSystemImpl$$anon$1",
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
"akka.dispatch.forkjoin.ForkJoinPool",
"akka.dispatch.BalancingDispatcher",
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"play.api.libs.streams.Execution$trampoline$",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.nio.NioEventLoopGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
"com.google.common.util.concurrent.AbstractListeningExecutorService",
"com.google.common.util.concurrent.MoreExecutors$ListeningDecorator",
"com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator",
};
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
final Set<String> executors =
new HashSet<>(Config.getListSettingFromEnvironment("trace.executors", ""));
executors.addAll(Arrays.asList(whitelist));
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(executors);
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
}
}
public AbstractExecutorInstrumentation(final String... additionalNames) {
@ -86,30 +103,33 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named(Executor.class.getName())))
.and(
new ElementMatcher<TypeDescription>() {
@Override
public boolean matches(final TypeDescription target) {
boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
final ElementMatcher.Junction<TypeDescription> matcher =
not(isInterface()).and(safeHasSuperType(named(Executor.class.getName())));
if (TRACE_ALL_EXECUTORS) {
return matcher;
}
return matcher.and(
new ElementMatcher<TypeDescription>() {
@Override
public boolean matches(final TypeDescription target) {
boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
// Check for possible prefixes match only if not whitelisted already
if (!whitelisted) {
for (final String name : WHITELISTED_EXECUTORS_PREFIXES) {
if (target.getName().startsWith(name)) {
whitelisted = true;
break;
}
}
// Check for possible prefixes match only if not whitelisted already
if (!whitelisted) {
for (final String name : WHITELISTED_EXECUTORS_PREFIXES) {
if (target.getName().startsWith(name)) {
whitelisted = true;
break;
}
if (!whitelisted) {
log.debug("Skipping executor instrumentation for {}", target.getName());
}
return whitelisted;
}
});
}
if (!whitelisted) {
log.debug("Skipping executor instrumentation for {}", target.getName());
}
return whitelisted;
}
});
}
@Override

View File

@ -8,18 +8,26 @@ import io.opentracing.util.GlobalTracer
import spock.lang.Shared
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.AbstractExecutorService
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutionException
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class ExecutorInstrumentationTest extends AgentTestRunner {
static {
System.setProperty("dd.trace.executors", "ExecutorInstrumentationTest\$CustomThreadPoolExecutor")
}
@Shared
def executeRunnable = { e, c -> e.execute((Runnable) c) }
@Shared
@ -108,6 +116,15 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool()
"invokeAny" | invokeAny | new ForkJoinPool()
"invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool()
// CustomThreadPoolExecutor would normally be disabled except enabled above.
"execute Runnable" | executeRunnable | new CustomThreadPoolExecutor()
"submit Runnable" | submitRunnable | new CustomThreadPoolExecutor()
"submit Callable" | submitCallable | new CustomThreadPoolExecutor()
"invokeAll" | invokeAll | new CustomThreadPoolExecutor()
"invokeAll with timeout" | invokeAllTimeout | new CustomThreadPoolExecutor()
"invokeAny" | invokeAny | new CustomThreadPoolExecutor()
"invokeAny with timeout" | invokeAnyTimeout | new CustomThreadPoolExecutor()
}
def "#poolImpl '#name' disabled wrapping"() {
@ -216,4 +233,105 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"submit Runnable" | submitRunnable | new ForkJoinPool()
"submit Callable" | submitCallable | new ForkJoinPool()
}
static class CustomThreadPoolExecutor extends AbstractExecutorService {
volatile running = true
def workQueue = new LinkedBlockingQueue<Runnable>(10)
def worker = new Runnable() {
void run() {
try {
while (running) {
def runnable = workQueue.take()
runnable.run()
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt()
} catch (Exception e) {
e.printStackTrace()
}
}
}
def workerThread = new Thread(worker, "ExecutorTestThread")
private CustomThreadPoolExecutor() {
workerThread.start()
}
@Override
void shutdown() {
running = false
workerThread.interrupt()
}
@Override
List<Runnable> shutdownNow() {
running = false
workerThread.interrupt()
return []
}
@Override
boolean isShutdown() {
return !running
}
@Override
boolean isTerminated() {
return workerThread.isAlive()
}
@Override
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
workerThread.join(unit.toMillis(timeout))
return true
}
@Override
def <T> Future<T> submit(Callable<T> task) {
def future = newTaskFor(task)
execute(future)
return future
}
@Override
def <T> Future<T> submit(Runnable task, T result) {
def future = newTaskFor(task, result)
execute(future)
return future
}
@Override
Future<?> submit(Runnable task) {
def future = newTaskFor(task, null)
execute(future)
return future
}
@Override
def <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return super.invokeAll(tasks)
}
@Override
def <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return super.invokeAll(tasks)
}
@Override
def <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return super.invokeAny(tasks)
}
@Override
def <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return super.invokeAny(tasks)
}
@Override
void execute(Runnable command) {
workQueue.put(command)
}
}
}

View File

@ -399,7 +399,11 @@ public class Config {
return parseMap(getSettingFromEnvironment(name, defaultValue), PREFIX + name);
}
private static List<String> getListSettingFromEnvironment(
/**
* Calls {@link #getSettingFromEnvironment(String, String)} and converts the result to a list by
* splitting on `,`.
*/
public static List<String> getListSettingFromEnvironment(
final String name, final String defaultValue) {
return parseList(getSettingFromEnvironment(name, defaultValue));
}