diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Instrumenter.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Instrumenter.java index 2ac275bd22..68849e0922 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Instrumenter.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Instrumenter.java @@ -43,7 +43,7 @@ public interface Instrumenter { /** @return Class names of helpers to inject into the user's classloader */ String[] helperClassNames(); - Map transformers(); + Map transformers(); @Slf4j abstract class Default implements Instrumenter { @@ -110,7 +110,7 @@ public interface Instrumenter { private AgentBuilder.Identified.Extendable applyInstrumentationTransformers( AgentBuilder.Identified.Extendable agentBuilder) { - for (final Map.Entry entry : transformers().entrySet()) { + for (final Map.Entry entry : transformers().entrySet()) { agentBuilder = agentBuilder.transform( new AgentBuilder.Transformer.ForAdvice() @@ -185,7 +185,7 @@ public interface Instrumenter { public abstract ElementMatcher typeMatcher(); @Override - public abstract Map transformers(); + public abstract Map transformers(); protected boolean defaultEnabled() { return getConfigEnabled("dd.integrations.enabled", true); diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 39c4c805de..8c1add7530 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -10,6 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import datadog.trace.bootstrap.WeakMap; import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.util.GlobalTracer; @@ -160,9 +161,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.OnMethodEnter(suppress = Throwable.class) public static DatadogWrapper enterJobSubmit( + @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (DatadogWrapper.shouldWrapTask(task)) { + if (DatadogWrapper.shouldWrapTask(task, executor)) { task = new RunnableWrapper(task, (TraceScope) scope); return (RunnableWrapper) task; } @@ -180,10 +182,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.OnMethodEnter(suppress = Throwable.class) public static DatadogWrapper enterJobSubmit( + @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (DatadogWrapper.shouldWrapTask(task)) { + if (DatadogWrapper.shouldWrapTask(task, executor)) { task = new CallableWrapper<>(task, (TraceScope) scope); return (CallableWrapper) task; } @@ -274,13 +277,14 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { * @param task task object * @return true iff given task object should be wrapped */ - public static boolean shouldWrapTask(final Object task) { + public static boolean shouldWrapTask(final Object task, final Executor executor) { final Scope scope = GlobalTracer.get().scopeManager().active(); return (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating() && task != null && !(task instanceof DatadogWrapper) - && isTopLevelCall()); + && isTopLevelCall() + && !ConcurrentUtils.isDisabled(executor)); } /** @@ -343,10 +347,22 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { } /** Utils for pulling DatadogWrapper out of Future instances. */ + @Slf4j public static class ConcurrentUtils { + private static final WeakMap disabledExecutors = + WeakMap.Provider.newWeakMap(); private static final Map, Field> fieldCache = new ConcurrentHashMap<>(); private static final String[] wrapperFields = {"runnable", "callable"}; + public static void disableExecutor(final Executor executor) { + log.debug("Disabling Executor tracing for instance {}", executor); + disabledExecutors.put(executor, true); + } + + public static boolean isDisabled(final Executor executor) { + return disabledExecutors.containsKey(executor); + } + public static DatadogWrapper getDatadogWrapper(final Future f) { final Field field; if (fieldCache.containsKey(f.getClass())) { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java new file mode 100644 index 0000000000..9f644d1f7e --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.java.concurrent; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@Slf4j +@AutoService(Instrumenter.class) +public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { + + public ThreadPoolExecutorInstrumentation() { + super(ExecutorInstrumentation.EXEC_NAME); + } + + @Override + public ElementMatcher typeMatcher() { + return named("java.util.concurrent.ThreadPoolExecutor"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + ExecutorInstrumentation.class.getName() + "$ConcurrentUtils", + ThreadPoolExecutorInstrumentation.class.getName() + "$GenericRunnable", + }; + } + + @Override + public Map transformers() { + return Collections.singletonMap( + isConstructor() + .and(takesArgument(4, named("java.util.concurrent.BlockingQueue"))) + .and(takesArguments(7)), + ThreadPoolExecutorAdvice.class.getName()); + } + + public static class ThreadPoolExecutorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void disableIfQueueWrongType( + @Advice.This final ThreadPoolExecutor executor, + @Advice.Argument(4) final BlockingQueue queue) { + + if (queue.size() == 0) { + try { + queue.add(new GenericRunnable()); + queue.clear(); // Remove the Runnable we just added. + } catch (final ClassCastException e) { + ExecutorInstrumentation.ConcurrentUtils.disableExecutor(executor); + } + } + } + } + + public static class GenericRunnable implements Runnable { + + @Override + public void run() {} + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy index 2ce2fddb38..47d51f4eb1 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy @@ -2,13 +2,13 @@ import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDSpanTypes import datadog.trace.api.DDTags import io.opentracing.tag.Tags -import spock.lang.Shared import static datadog.trace.agent.test.asserts.ListWriterAssert.assertTraces class SlickTest extends AgentTestRunner { - @Shared + // Can't be @Shared, otherwise the work queue is initialized before the instrumentation is applied + // @Shared def database = new SlickUtils() def "Basic statement generates spans"() { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala index 110cc5b305..c1cb3514f0 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala +++ b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala @@ -18,7 +18,7 @@ class SlickUtils { // wrapped runnables. executor = AsyncExecutor("test", numThreads = 1, queueSize = 1000) ) - Await.result(database.run(sqlu"""CREATE ALIAS SLEEP FOR "java.lang.Thread.sleep(long)""""), Duration.Inf) + Await.result(database.run(sqlu"""CREATE ALIAS IF NOT EXISTS SLEEP FOR "java.lang.Thread.sleep(long)""""), Duration.Inf) @Trace def startQuery(query: String): Future[Vector[Int]] = { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 34df643c57..ea28c8f411 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -13,7 +13,6 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.ForkJoinPool import java.util.concurrent.Future import java.util.concurrent.RejectedExecutionException -import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -23,10 +22,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Shared Method executeMethod - static { - System.setProperty("dd.integration.java_concurrent.enabled", "true") - } - def setupSpec() { executeMethod = Executor.getMethod("execute", Runnable) submitMethod = ExecutorService.getMethod("submit", Callable) @@ -71,8 +66,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner { new ForkJoinPool() | executeMethod new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitMethod new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeMethod - new ScheduledThreadPoolExecutor(1) | submitMethod - new ScheduledThreadPoolExecutor(1) | executeMethod } // more useful name breaks java9 javac @@ -112,6 +105,5 @@ class ExecutorInstrumentationTest extends AgentTestRunner { poolImpl | _ new ForkJoinPool() | _ new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | _ - new ScheduledThreadPoolExecutor(1) | _ } }