From cc23fee614c0349a5a56989e6f4f64536ea01d06 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Mon, 13 May 2019 17:33:40 -0700 Subject: [PATCH] Add config to enable individual executors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Or all executors, bypassing the allow list. `dd.trace.executor=com.MyCustomExecutor,com.OtherExecutor` `dd.trace.executors.all=true` Turns out in many cases, executors that we say we’re skipping, are still being traced because they extend from an already instrumented executor. --- .../AbstractExecutorInstrumentation.java | 154 ++++++++++-------- .../groovy/ExecutorInstrumentationTest.groovy | 118 ++++++++++++++ .../main/java/datadog/trace/api/Config.java | 6 +- 3 files changed, 210 insertions(+), 68 deletions(-) diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java index ffbeb5ea62..3532dc0835 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java @@ -6,10 +6,12 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.not; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.Config; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Set; import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import net.bytebuddy.description.type.TypeDescription; @@ -20,11 +22,15 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau public static final String EXEC_NAME = "java_concurrent"; + private static final boolean TRACE_ALL_EXECUTORS = + Config.getBooleanSettingFromEnvironment("trace.executors.all", false); + /** - * Only apply executor instrumentation to whitelisted executors. In the future, this restriction - * may be lifted to include all executors. + * Only apply executor instrumentation to whitelisted executors. To apply to all executors, use + * override setting above. */ private static final Collection WHITELISTED_EXECUTORS; + /** * Some frameworks have their executors defined as anon classes inside other classes. Referencing * anon classes by name would be fragile, so instead we will use list of class prefix names. Since @@ -33,51 +39,62 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau private static final Collection WHITELISTED_EXECUTORS_PREFIXES; static { - final String[] whitelist = { - "java.util.concurrent.AbstractExecutorService", - "java.util.concurrent.ThreadPoolExecutor", - "java.util.concurrent.ScheduledThreadPoolExecutor", - "java.util.concurrent.ForkJoinPool", - "java.util.concurrent.Executors$FinalizableDelegatedExecutorService", - "java.util.concurrent.Executors$DelegatedExecutorService", - "javax.management.NotificationBroadcasterSupport$1", - "kotlinx.coroutines.scheduling.CoroutineScheduler", - "scala.concurrent.Future$InternalCallbackExecutor$", - "scala.concurrent.impl.ExecutionContextImpl", - "scala.concurrent.impl.ExecutionContextImpl$$anon$1", - "scala.concurrent.forkjoin.ForkJoinPool", - "scala.concurrent.impl.ExecutionContextImpl$$anon$3", - "akka.dispatch.MessageDispatcher", - "akka.dispatch.Dispatcher", - "akka.dispatch.Dispatcher$LazyExecutorServiceDelegate", - "akka.actor.ActorSystemImpl$$anon$1", - "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool", - "akka.dispatch.forkjoin.ForkJoinPool", - "akka.dispatch.BalancingDispatcher", - "akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1", - "akka.dispatch.PinnedDispatcher", - "akka.dispatch.ExecutionContexts$sameThreadExecutionContext$", - "play.api.libs.streams.Execution$trampoline$", - "io.netty.channel.MultithreadEventLoopGroup", - "io.netty.util.concurrent.MultithreadEventExecutorGroup", - "io.netty.util.concurrent.AbstractEventExecutorGroup", - "io.netty.channel.epoll.EpollEventLoopGroup", - "io.netty.channel.nio.NioEventLoopGroup", - "io.netty.util.concurrent.GlobalEventExecutor", - "io.netty.util.concurrent.AbstractScheduledEventExecutor", - "io.netty.util.concurrent.AbstractEventExecutor", - "io.netty.util.concurrent.SingleThreadEventExecutor", - "io.netty.channel.nio.NioEventLoop", - "io.netty.channel.SingleThreadEventLoop", - "com.google.common.util.concurrent.AbstractListeningExecutorService", - "com.google.common.util.concurrent.MoreExecutors$ListeningDecorator", - "com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator", - }; - WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); + if (TRACE_ALL_EXECUTORS) { + log.info("Tracing all executors enabled."); + WHITELISTED_EXECUTORS = Collections.emptyList(); + WHITELISTED_EXECUTORS_PREFIXES = Collections.emptyList(); + } else { + final String[] whitelist = { + "java.util.concurrent.AbstractExecutorService", + "java.util.concurrent.ThreadPoolExecutor", + "java.util.concurrent.ScheduledThreadPoolExecutor", + "java.util.concurrent.ForkJoinPool", + "java.util.concurrent.Executors$FinalizableDelegatedExecutorService", + "java.util.concurrent.Executors$DelegatedExecutorService", + "javax.management.NotificationBroadcasterSupport$1", + "kotlinx.coroutines.scheduling.CoroutineScheduler", + "scala.concurrent.Future$InternalCallbackExecutor$", + "scala.concurrent.impl.ExecutionContextImpl", + "scala.concurrent.impl.ExecutionContextImpl$$anon$1", + "scala.concurrent.forkjoin.ForkJoinPool", + "scala.concurrent.impl.ExecutionContextImpl$$anon$3", + "akka.dispatch.MessageDispatcher", + "akka.dispatch.Dispatcher", + "akka.dispatch.Dispatcher$LazyExecutorServiceDelegate", + "akka.actor.ActorSystemImpl$$anon$1", + "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool", + "akka.dispatch.forkjoin.ForkJoinPool", + "akka.dispatch.BalancingDispatcher", + "akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1", + "akka.dispatch.PinnedDispatcher", + "akka.dispatch.ExecutionContexts$sameThreadExecutionContext$", + "play.api.libs.streams.Execution$trampoline$", + "io.netty.channel.MultithreadEventLoopGroup", + "io.netty.util.concurrent.MultithreadEventExecutorGroup", + "io.netty.util.concurrent.AbstractEventExecutorGroup", + "io.netty.channel.epoll.EpollEventLoopGroup", + "io.netty.channel.nio.NioEventLoopGroup", + "io.netty.util.concurrent.GlobalEventExecutor", + "io.netty.util.concurrent.AbstractScheduledEventExecutor", + "io.netty.util.concurrent.AbstractEventExecutor", + "io.netty.util.concurrent.SingleThreadEventExecutor", + "io.netty.channel.nio.NioEventLoop", + "io.netty.channel.SingleThreadEventLoop", + "com.google.common.util.concurrent.AbstractListeningExecutorService", + "com.google.common.util.concurrent.MoreExecutors$ListeningDecorator", + "com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator", + }; - final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"}; - WHITELISTED_EXECUTORS_PREFIXES = - Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes)); + final Set executors = + new HashSet<>(Config.getListSettingFromEnvironment("trace.executors", "")); + executors.addAll(Arrays.asList(whitelist)); + + WHITELISTED_EXECUTORS = Collections.unmodifiableSet(executors); + + final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"}; + WHITELISTED_EXECUTORS_PREFIXES = + Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes)); + } } public AbstractExecutorInstrumentation(final String... additionalNames) { @@ -86,30 +103,33 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau @Override public ElementMatcher typeMatcher() { - return not(isInterface()) - .and(safeHasSuperType(named(Executor.class.getName()))) - .and( - new ElementMatcher() { - @Override - public boolean matches(final TypeDescription target) { - boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); + final ElementMatcher.Junction matcher = + not(isInterface()).and(safeHasSuperType(named(Executor.class.getName()))); + if (TRACE_ALL_EXECUTORS) { + return matcher; + } + return matcher.and( + new ElementMatcher() { + @Override + public boolean matches(final TypeDescription target) { + boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); - // Check for possible prefixes match only if not whitelisted already - if (!whitelisted) { - for (final String name : WHITELISTED_EXECUTORS_PREFIXES) { - if (target.getName().startsWith(name)) { - whitelisted = true; - break; - } - } + // Check for possible prefixes match only if not whitelisted already + if (!whitelisted) { + for (final String name : WHITELISTED_EXECUTORS_PREFIXES) { + if (target.getName().startsWith(name)) { + whitelisted = true; + break; } - - if (!whitelisted) { - log.debug("Skipping executor instrumentation for {}", target.getName()); - } - return whitelisted; } - }); + } + + if (!whitelisted) { + log.debug("Skipping executor instrumentation for {}", target.getName()); + } + return whitelisted; + } + }); } @Override diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index a676ad41b6..9825b95813 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -8,18 +8,26 @@ import io.opentracing.util.GlobalTracer import spock.lang.Shared import java.lang.reflect.InvocationTargetException +import java.util.concurrent.AbstractExecutorService import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Callable +import java.util.concurrent.ExecutionException import java.util.concurrent.ForkJoinPool import java.util.concurrent.ForkJoinTask import java.util.concurrent.Future +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.RejectedExecutionException import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException class ExecutorInstrumentationTest extends AgentTestRunner { + static { + System.setProperty("dd.trace.executors", "ExecutorInstrumentationTest\$CustomThreadPoolExecutor") + } + @Shared def executeRunnable = { e, c -> e.execute((Runnable) c) } @Shared @@ -108,6 +116,15 @@ class ExecutorInstrumentationTest extends AgentTestRunner { "invokeAll with timeout" | invokeAllTimeout | new ForkJoinPool() "invokeAny" | invokeAny | new ForkJoinPool() "invokeAny with timeout" | invokeAnyTimeout | new ForkJoinPool() + + // CustomThreadPoolExecutor would normally be disabled except enabled above. + "execute Runnable" | executeRunnable | new CustomThreadPoolExecutor() + "submit Runnable" | submitRunnable | new CustomThreadPoolExecutor() + "submit Callable" | submitCallable | new CustomThreadPoolExecutor() + "invokeAll" | invokeAll | new CustomThreadPoolExecutor() + "invokeAll with timeout" | invokeAllTimeout | new CustomThreadPoolExecutor() + "invokeAny" | invokeAny | new CustomThreadPoolExecutor() + "invokeAny with timeout" | invokeAnyTimeout | new CustomThreadPoolExecutor() } def "#poolImpl '#name' disabled wrapping"() { @@ -216,4 +233,105 @@ class ExecutorInstrumentationTest extends AgentTestRunner { "submit Runnable" | submitRunnable | new ForkJoinPool() "submit Callable" | submitCallable | new ForkJoinPool() } + + static class CustomThreadPoolExecutor extends AbstractExecutorService { + volatile running = true + def workQueue = new LinkedBlockingQueue(10) + + def worker = new Runnable() { + void run() { + try { + while (running) { + def runnable = workQueue.take() + runnable.run() + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt() + } catch (Exception e) { + e.printStackTrace() + } + } + } + + def workerThread = new Thread(worker, "ExecutorTestThread") + + private CustomThreadPoolExecutor() { + workerThread.start() + } + + @Override + void shutdown() { + running = false + workerThread.interrupt() + } + + @Override + List shutdownNow() { + running = false + workerThread.interrupt() + return [] + } + + @Override + boolean isShutdown() { + return !running + } + + @Override + boolean isTerminated() { + return workerThread.isAlive() + } + + @Override + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + workerThread.join(unit.toMillis(timeout)) + return true + } + + @Override + def Future submit(Callable task) { + def future = newTaskFor(task) + execute(future) + return future + } + + @Override + def Future submit(Runnable task, T result) { + def future = newTaskFor(task, result) + execute(future) + return future + } + + @Override + Future submit(Runnable task) { + def future = newTaskFor(task, null) + execute(future) + return future + } + + @Override + def List> invokeAll(Collection> tasks) throws InterruptedException { + return super.invokeAll(tasks) + } + + @Override + def List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return super.invokeAll(tasks) + } + + @Override + def T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return super.invokeAny(tasks) + } + + @Override + def T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return super.invokeAny(tasks) + } + + @Override + void execute(Runnable command) { + workQueue.put(command) + } + } } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/Config.java b/dd-trace-api/src/main/java/datadog/trace/api/Config.java index b2e7098a3b..9992f041f6 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/Config.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/Config.java @@ -399,7 +399,11 @@ public class Config { return parseMap(getSettingFromEnvironment(name, defaultValue), PREFIX + name); } - private static List getListSettingFromEnvironment( + /** + * Calls {@link #getSettingFromEnvironment(String, String)} and converts the result to a list by + * splitting on `,`. + */ + public static List getListSettingFromEnvironment( final String name, final String defaultValue) { return parseList(getSettingFromEnvironment(name, defaultValue)); }