diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/akka-2.5-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/akka-2.5-testing.gradle new file mode 100644 index 0000000000..ee0b4aafec --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/akka-2.5-testing.gradle @@ -0,0 +1,20 @@ +// Set properties before any plugins get loaded +ext { + minJavaVersionForTests = JavaVersion.VERSION_1_8 + // Execute tests on all JVMs, even rare and outdated ones + coreJavaInstrumentation = true +} + +apply from: "${rootDir}/gradle/java.gradle" +apply from: "${rootDir}/gradle/test-with-scala.gradle" + +dependencies { + testCompile project(':dd-trace-api') + testCompile project(':dd-trace-ot') + testCompile deps.scala + testCompile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.0' + + testCompile project(':dd-java-agent:testing') + testCompile project(':dd-java-agent:instrumentation:java-concurrent') + testCompile project(':dd-java-agent:instrumentation:trace-annotation') +} diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy new file mode 100644 index 0000000000..4462ebe265 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/groovy/AkkaExecutorInstrumentationTest.groovy @@ -0,0 +1,149 @@ +import akka.dispatch.forkjoin.ForkJoinPool +import akka.dispatch.forkjoin.ForkJoinTask +import datadog.opentracing.DDSpan +import datadog.opentracing.scopemanager.ContinuableScope +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.Trace +import io.opentracing.util.GlobalTracer +import spock.lang.Shared + +import java.lang.reflect.InvocationTargetException +import java.lang.reflect.Method +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Callable +import java.util.concurrent.Executor +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +/** + * Test executor instrumentation for Akka specific classes. + * This is to large extent a copy of ExecutorInstrumentationTest. + */ +class AkkaExecutorInstrumentationTest extends AgentTestRunner { + @Shared + Method executeRunnableMethod + @Shared + Method akkaExecuteForkJoinTaskMethod + @Shared + Method submitRunnableMethod + @Shared + Method submitCallableMethod + @Shared + Method akkaSubmitForkJoinTaskMethod + @Shared + Method akkaInvokeForkJoinTaskMethod + + def setupSpec() { + executeRunnableMethod = Executor.getMethod("execute", Runnable) + akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) + submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) + submitCallableMethod = ExecutorService.getMethod("submit", Callable) + akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) + akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) + } + + // more useful name breaks java9 javac + // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() + def "#poolImpl #method propagates"() { + setup: + def pool = poolImpl + def m = method + + new Runnable() { + @Override + @Trace(operationName = "parent") + void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) + // this child will have a span + m.invoke(pool, new AkkaAsyncChild()) + // this child won't + m.invoke(pool, new AkkaAsyncChild(false, false)) + } + }.run() + + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == 2 + trace.get(0).operationName == "parent" + trace.get(1).operationName == "asyncChild" + trace.get(1).parentId == trace.get(0).spanId + + cleanup: + pool?.shutdown() + + // Unfortunately, there's no simple way to test the cross product of methods/pools. + where: + poolImpl | method + new ForkJoinPool() | executeRunnableMethod + new ForkJoinPool() | akkaExecuteForkJoinTaskMethod + new ForkJoinPool() | submitRunnableMethod + new ForkJoinPool() | submitCallableMethod + new ForkJoinPool() | akkaSubmitForkJoinTaskMethod + new ForkJoinPool() | akkaInvokeForkJoinTaskMethod + + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + } + + // more useful name breaks java9 javac + // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() + def "#poolImpl reports after canceled jobs"() { + setup: + def pool = poolImpl + def m = method + List children = new ArrayList<>() + List jobFutures = new ArrayList<>() + + new Runnable() { + @Override + @Trace(operationName = "parent") + void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) + try { + for (int i = 0; i < 20; ++i) { + // Our current instrumentation instrumentation does not behave very well + // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned' + // child traces sometimes since state can contain only one continuation - and + // we do not really have a good way for attributing work to correct parent span + // if we reuse Callable/Runnable. + // Solution for now is to never reuse a Callable/Runnable. + final AkkaAsyncChild child = new AkkaAsyncChild(true, true) + children.add(child) + try { + Future f = m.invoke(pool, new AkkaAsyncChild()) + jobFutures.add(f) + } catch (InvocationTargetException e) { + throw e.getCause() + } + } + } catch (RejectedExecutionException e) { + } + + for (Future f : jobFutures) { + f.cancel(false) + } + for (AkkaAsyncChild child : children) { + child.unblock() + } + } + }.run() + + TEST_WRITER.waitForTraces(1) + + expect: + // FIXME: we should improve this test to make sure continuations are actually closed + TEST_WRITER.size() == 1 + + where: + poolImpl | method + new ForkJoinPool() | submitRunnableMethod + new ForkJoinPool() | submitCallableMethod + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/scala/AkkaAsyncChild.java b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/scala/AkkaAsyncChild.java new file mode 100644 index 0000000000..695588f462 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/akka-2.5-testing/src/test/scala/AkkaAsyncChild.java @@ -0,0 +1,59 @@ +import akka.dispatch.forkjoin.ForkJoinTask; +import datadog.trace.api.Trace; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AkkaAsyncChild extends ForkJoinTask implements Runnable, Callable { + private final AtomicBoolean blockThread; + private final boolean doTraceableWork; + + public AkkaAsyncChild() { + this(true, false); + } + + @Override + public Object getRawResult() { + return null; + } + + @Override + protected void setRawResult(final Object value) {} + + @Override + protected boolean exec() { + runImpl(); + return true; + } + + public AkkaAsyncChild(final boolean doTraceableWork, final boolean blockThread) { + this.doTraceableWork = doTraceableWork; + this.blockThread = new AtomicBoolean(blockThread); + } + + public void unblock() { + blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() throws Exception { + runImpl(); + return null; + } + + private void runImpl() { + while (blockThread.get()) { + // busy-wait to block thread + } + if (doTraceableWork) { + asyncChild(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} +} diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-testing/akka-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/akka-testing/akka-testing.gradle index b6ddf559a6..3e05a0d5de 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-testing/akka-testing.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/akka-testing/akka-testing.gradle @@ -1,3 +1,9 @@ +// Set properties before any plugins get loaded +ext { + // Execute tests on all JVMs, even rare and outdated ones + coreJavaInstrumentation = true +} + apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/test-with-scala.gradle" diff --git a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle index 652723b713..162d119a0d 100644 --- a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle @@ -20,6 +20,11 @@ compileSlickTestGroovy { } dependencies { + // This is needed for Scala ForJoinTask/Pool instrumentation + compileOnly deps.scala + // This is needed for Akka ForJoinTask/Pool instrumentation + compileOnly group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.0' + compile project(':dd-trace-api') compile project(':dd-java-agent:agent-tooling') diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle index 53ac0a4e76..b9719d5969 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle @@ -1,3 +1,9 @@ +// Set properties before any plugins get loaded +project.ext { + // Execute tests on all JVMs, even rare and outdated ones + coreJavaInstrumentation = true +} + apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/test-with-scala.gradle" diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy new file mode 100644 index 0000000000..b46af31619 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaExecutorInstrumentationTest.groovy @@ -0,0 +1,149 @@ +import datadog.opentracing.DDSpan +import datadog.opentracing.scopemanager.ContinuableScope +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.Trace +import io.opentracing.util.GlobalTracer +import scala.concurrent.forkjoin.ForkJoinPool +import scala.concurrent.forkjoin.ForkJoinTask +import spock.lang.Shared + +import java.lang.reflect.InvocationTargetException +import java.lang.reflect.Method +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Callable +import java.util.concurrent.Executor +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +/** + * Test executor instrumentation for Scala specific classes. + * This is to large extent a copy of ExecutorInstrumentationTest. + */ +class ScalaExecutorInstrumentationTest extends AgentTestRunner { + @Shared + Method executeRunnableMethod + @Shared + Method scalaExecuteForkJoinTaskMethod + @Shared + Method submitRunnableMethod + @Shared + Method submitCallableMethod + @Shared + Method scalaSubmitForkJoinTaskMethod + @Shared + Method scalaInvokeForkJoinTaskMethod + + def setupSpec() { + executeRunnableMethod = Executor.getMethod("execute", Runnable) + scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask) + submitRunnableMethod = ExecutorService.getMethod("submit", Runnable) + submitCallableMethod = ExecutorService.getMethod("submit", Callable) + scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask) + scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask) + } + + // more useful name breaks java9 javac + // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() + def "#poolImpl #method propagates"() { + setup: + def pool = poolImpl + def m = method + + new Runnable() { + @Override + @Trace(operationName = "parent") + void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) + // this child will have a span + m.invoke(pool, new ScalaAsyncChild()) + // this child won't + m.invoke(pool, new ScalaAsyncChild(false, false)) + } + }.run() + + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == 2 + trace.get(0).operationName == "parent" + trace.get(1).operationName == "asyncChild" + trace.get(1).parentId == trace.get(0).spanId + + cleanup: + pool?.shutdown() + + // Unfortunately, there's no simple way to test the cross product of methods/pools. + where: + poolImpl | method + new ForkJoinPool() | executeRunnableMethod + new ForkJoinPool() | scalaExecuteForkJoinTaskMethod + new ForkJoinPool() | submitRunnableMethod + new ForkJoinPool() | submitCallableMethod + new ForkJoinPool() | scalaSubmitForkJoinTaskMethod + new ForkJoinPool() | scalaInvokeForkJoinTaskMethod + + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + } + + // more useful name breaks java9 javac + // def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"() + def "#poolImpl reports after canceled jobs"() { + setup: + def pool = poolImpl + def m = method + List children = new ArrayList<>() + List jobFutures = new ArrayList<>() + + new Runnable() { + @Override + @Trace(operationName = "parent") + void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) + try { + for (int i = 0; i < 20; ++i) { + // Our current instrumentation instrumentation does not behave very well + // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned' + // child traces sometimes since state can contain only one continuation - and + // we do not really have a good way for attributing work to correct parent span + // if we reuse Callable/Runnable. + // Solution for now is to never reuse a Callable/Runnable. + final ScalaAsyncChild child = new ScalaAsyncChild(true, true) + children.add(child) + try { + Future f = m.invoke(pool, new ScalaAsyncChild()) + jobFutures.add(f) + } catch (InvocationTargetException e) { + throw e.getCause() + } + } + } catch (RejectedExecutionException e) { + } + + for (Future f : jobFutures) { + f.cancel(false) + } + for (ScalaAsyncChild child : children) { + child.unblock() + } + } + }.run() + + TEST_WRITER.waitForTraces(1) + + expect: + // FIXME: we should improve this test to make sure continuations are actually closed + TEST_WRITER.size() == 1 + + where: + poolImpl | method + new ForkJoinPool() | submitRunnableMethod + new ForkJoinPool() | submitCallableMethod + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaAsyncChild.java b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaAsyncChild.java new file mode 100644 index 0000000000..42d48a3fe0 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaAsyncChild.java @@ -0,0 +1,59 @@ +import datadog.trace.api.Trace; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import scala.concurrent.forkjoin.ForkJoinTask; + +public class ScalaAsyncChild extends ForkJoinTask implements Runnable, Callable { + private final AtomicBoolean blockThread; + private final boolean doTraceableWork; + + public ScalaAsyncChild() { + this(true, false); + } + + @Override + public Object getRawResult() { + return null; + } + + @Override + protected void setRawResult(final Object value) {} + + @Override + protected boolean exec() { + runImpl(); + return true; + } + + public ScalaAsyncChild(final boolean doTraceableWork, final boolean blockThread) { + this.doTraceableWork = doTraceableWork; + this.blockThread = new AtomicBoolean(blockThread); + } + + public void unblock() { + blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() throws Exception { + runImpl(); + return null; + } + + private void runImpl() { + while (blockThread.get()) { + // busy-wait to block thread + } + if (doTraceableWork) { + asyncChild(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java new file mode 100644 index 0000000000..c514afdbe7 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java @@ -0,0 +1,118 @@ +package datadog.trace.instrumentation.java.concurrent; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; + +import datadog.trace.agent.tooling.Instrumenter; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.Executor; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@Slf4j +public abstract class AbstractExecutorInstrumentation extends Instrumenter.Default { + + public static final String EXEC_NAME = "java_concurrent"; + + /** + * Only apply executor instrumentation to whitelisted executors. In the future, this restriction + * may be lifted to include all executors. + */ + private static final Collection WHITELISTED_EXECUTORS; + /** + * Some frameworks have their executors defined as anon classes inside other classes. Referencing + * anon classes by name would be fragile, so instead we will use list of class prefix names. Since + * checking this list is more expensive (O(n)) we should try to keep it short. + */ + private static final Collection WHITELISTED_EXECUTORS_PREFIXES; + + static { + final String[] whitelist = { + "java.util.concurrent.AbstractExecutorService", + "java.util.concurrent.ThreadPoolExecutor", + "java.util.concurrent.ScheduledThreadPoolExecutor", + "java.util.concurrent.ForkJoinPool", + "java.util.concurrent.Executors$FinalizableDelegatedExecutorService", + "java.util.concurrent.Executors$DelegatedExecutorService", + "javax.management.NotificationBroadcasterSupport$1", + "kotlinx.coroutines.scheduling.CoroutineScheduler", + "scala.concurrent.Future$InternalCallbackExecutor$", + "scala.concurrent.impl.ExecutionContextImpl", + "scala.concurrent.impl.ExecutionContextImpl$$anon$1", + "scala.concurrent.forkjoin.ForkJoinPool", + "scala.concurrent.impl.ExecutionContextImpl$$anon$3", + "akka.dispatch.MessageDispatcher", + "akka.dispatch.Dispatcher", + "akka.dispatch.Dispatcher$LazyExecutorServiceDelegate", + "akka.actor.ActorSystemImpl$$anon$1", + "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool", + "akka.dispatch.forkjoin.ForkJoinPool", + "akka.dispatch.BalancingDispatcher", + "akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1", + "akka.dispatch.PinnedDispatcher", + "akka.dispatch.ExecutionContexts$sameThreadExecutionContext$", + "play.api.libs.streams.Execution$trampoline$", + "io.netty.channel.MultithreadEventLoopGroup", + "io.netty.util.concurrent.MultithreadEventExecutorGroup", + "io.netty.util.concurrent.AbstractEventExecutorGroup", + "io.netty.channel.epoll.EpollEventLoopGroup", + "io.netty.channel.nio.NioEventLoopGroup", + "io.netty.util.concurrent.GlobalEventExecutor", + "io.netty.util.concurrent.AbstractScheduledEventExecutor", + "io.netty.util.concurrent.AbstractEventExecutor", + "io.netty.util.concurrent.SingleThreadEventExecutor", + "io.netty.channel.nio.NioEventLoop", + "io.netty.channel.SingleThreadEventLoop", + }; + WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); + + final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"}; + WHITELISTED_EXECUTORS_PREFIXES = + Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes)); + } + + public AbstractExecutorInstrumentation(final String... additionalNames) { + super(EXEC_NAME, additionalNames); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()) + .and(safeHasSuperType(named(Executor.class.getName()))) + .and( + new ElementMatcher() { + @Override + public boolean matches(final TypeDescription target) { + boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); + + // Check for possible prefixes match only if not whitelisted already + if (!whitelisted) { + for (final String name : WHITELISTED_EXECUTORS_PREFIXES) { + if (target.getName().startsWith(name)) { + whitelisted = true; + break; + } + } + } + + if (!whitelisted) { + log.debug("Skipping executor instrumentation for {}", target.getName()); + } + return whitelisted; + } + }); + } + + @Override + public String[] helperClassNames() { + return new String[] { + AbstractExecutorInstrumentation.class.getPackage().getName() + ".ExecutorInstrumentationUtils" + }; + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AkkaExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AkkaExecutorInstrumentation.java new file mode 100644 index 0000000000..930e98bedc --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AkkaExecutorInstrumentation.java @@ -0,0 +1,80 @@ +package datadog.trace.instrumentation.java.concurrent; + +import static net.bytebuddy.matcher.ElementMatchers.nameMatches; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import akka.dispatch.forkjoin.ForkJoinTask; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@Slf4j +@AutoService(Instrumenter.class) +public final class AkkaExecutorInstrumentation extends AbstractExecutorInstrumentation { + + public AkkaExecutorInstrumentation() { + super(EXEC_NAME + ".akka_fork_join"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap( + AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName()); + } + + @Override + public Map, String> transformers() { + final Map, String> transformers = new HashMap<>(); + transformers.put( + named("execute") + .and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))), + SetAkkaForkJoinStateAdvice.class.getName()); + transformers.put( + named("submit") + .and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))), + SetAkkaForkJoinStateAdvice.class.getName()); + transformers.put( + nameMatches("invoke") + .and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))), + SetAkkaForkJoinStateAdvice.class.getName()); + return transformers; + } + + public static class SetAkkaForkJoinStateAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static State enterJobSubmit( + @Advice.This final Executor executor, + @Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) { + final Scope scope = GlobalTracer.get().scopeManager().active(); + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { + final ContextStore contextStore = + InstrumentationContext.get(ForkJoinTask.class, State.class); + return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exitJobSubmit( + @Advice.This final Executor executor, + @Advice.Enter final State state, + @Advice.Thrown final Throwable throwable) { + ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable); + } + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AkkaForkJoinTaskInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AkkaForkJoinTaskInstrumentation.java new file mode 100644 index 0000000000..20fd0b5081 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AkkaForkJoinTaskInstrumentation.java @@ -0,0 +1,121 @@ +package datadog.trace.instrumentation.java.concurrent; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isAbstract; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import akka.dispatch.forkjoin.ForkJoinPool; +import akka.dispatch.forkjoin.ForkJoinTask; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import datadog.trace.context.TraceScope; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** + * Instrument {@link ForkJoinTask}. + * + *

Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code + * ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles Akka version. + */ +@Slf4j +@AutoService(Instrumenter.class) +public final class AkkaForkJoinTaskInstrumentation extends Instrumenter.Default { + + static final String TASK_CLASS_NAME = "akka.dispatch.forkjoin.ForkJoinTask"; + + public AkkaForkJoinTaskInstrumentation() { + super(AbstractExecutorInstrumentation.EXEC_NAME); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()).and(safeHasSuperType(named(TASK_CLASS_NAME))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + AdviceUtils.class.getName(), + }; + } + + @Override + public Map contextStore() { + final Map map = new HashMap<>(); + map.put(Runnable.class.getName(), State.class.getName()); + map.put(Callable.class.getName(), State.class.getName()); + map.put(TASK_CLASS_NAME, State.class.getName()); + return Collections.unmodifiableMap(map); + } + + @Override + public Map, String> transformers() { + final Map, String> transformers = new HashMap<>(); + transformers.put( + named("exec").and(takesArguments(0)).and(not(isAbstract())), + ForkJoinTaskAdvice.class.getName()); + return transformers; + } + + public static class ForkJoinTaskAdvice { + + /** + * When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or + * {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask} + * directly. This means state is still stored in {@code Runnable} or {@code Callable} and we + * need to use that state. + */ + @Advice.OnMethodEnter(suppress = Throwable.class) + public static TraceScope enter(@Advice.This final ForkJoinTask thiz) { + final ContextStore contextStore = + InstrumentationContext.get(ForkJoinTask.class, State.class); + TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz); + if (thiz instanceof Runnable) { + final ContextStore runnableContextStore = + InstrumentationContext.get(Runnable.class, State.class); + final TraceScope newScope = + AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz); + if (null != newScope) { + if (null != scope) { + newScope.close(); + } else { + scope = newScope; + } + } + } + if (thiz instanceof Callable) { + final ContextStore callableContextStore = + InstrumentationContext.get(Callable.class, State.class); + final TraceScope newScope = + AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz); + if (null != newScope) { + if (null != scope) { + newScope.close(); + } else { + scope = newScope; + } + } + } + return scope; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exit(@Advice.Enter final TraceScope scope) { + AdviceUtils.endTaskScope(scope); + } + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java new file mode 100644 index 0000000000..779a95b80f --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentationUtils.java @@ -0,0 +1,85 @@ +package datadog.trace.instrumentation.java.concurrent; + +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.WeakMap; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; +import java.util.concurrent.Executor; +import lombok.extern.slf4j.Slf4j; + +/** Utils for concurrent instrumentations. */ +@Slf4j +public class ExecutorInstrumentationUtils { + + private static final WeakMap DISABLED_EXECUTORS = + WeakMap.Provider.newWeakMap(); + + /** + * Checks if given task should get state attached. + * + * @param task task object + * @param executor executor this task was scheduled on + * @return true iff given task object should be wrapped + */ + public static boolean shouldAttachStateToTask(final Object task, final Executor executor) { + final Scope scope = GlobalTracer.get().scopeManager().active(); + return (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncPropagating() + && task != null + && !ExecutorInstrumentationUtils.isDisabled(executor)); + } + + /** + * Create task state given current scope. + * + * @param contextStore context storage + * @param task task instance + * @param scope current scope + * @param task class type + * @return new state + */ + public static State setupState( + final ContextStore contextStore, final T task, final TraceScope scope) { + final State state = contextStore.putIfAbsent(task, State.FACTORY); + final TraceScope.Continuation continuation = scope.capture(); + if (state.setContinuation(continuation)) { + log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state); + } else { + continuation.close(false); + } + return state; + } + + /** + * Clean up after job submission method has exited. + * + * @param executor the current executor + * @param state task instrumentation state + * @param throwable throwable that may have been thrown + */ + public static void cleanUpOnMethodExit( + final Executor executor, final State state, final Throwable throwable) { + if (null != state && null != throwable) { + /* + Note: this may potentially close somebody else's continuation if we didn't set it + up in setupState because it was already present before us. This should be safe but + may lead to non-attributed async work in some very rare cases. + Alternative is to not close continuation here if we did not set it up in setupState + but this may potentially lead to memory leaks if callers do not properly handle + exceptions. + */ + state.closeContinuation(); + } + } + + public static void disableExecutor(final Executor executor) { + log.debug("Disabling Executor tracing for instance {}", executor); + DISABLED_EXECUTORS.put(executor, true); + } + + public static boolean isDisabled(final Executor executor) { + return DISABLED_EXECUTORS.containsKey(executor); + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java index 3a1994b086..64fb66b397 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/FutureInstrumentation.java @@ -67,7 +67,7 @@ public final class FutureInstrumentation extends Instrumenter.Default { } public FutureInstrumentation() { - super(ExecutorInstrumentation.EXEC_NAME); + super(AbstractExecutorInstrumentation.EXEC_NAME); } @Override diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java similarity index 52% rename from dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java rename to dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java index 31b6740207..5e28ecf74c 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaExecutorInstrumentation.java @@ -1,17 +1,13 @@ package datadog.trace.instrumentation.java.concurrent; -import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; -import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.nameMatches; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.WeakMap; import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper; import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper; import datadog.trace.bootstrap.instrumentation.java.concurrent.State; @@ -19,11 +15,9 @@ import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.util.GlobalTracer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -32,109 +26,11 @@ import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @Slf4j @AutoService(Instrumenter.class) -public final class ExecutorInstrumentation extends Instrumenter.Default { - public static final String EXEC_NAME = "java_concurrent"; - - /** - * Only apply executor instrumentation to whitelisted executors. In the future, this restriction - * may be lifted to include all executors. - */ - private static final Collection WHITELISTED_EXECUTORS; - /** - * Some frameworks have their executors defined as anon classes inside other classes. Referencing - * anon classes by name would be fragile, so instead we will use list of class prefix names. Since - * checking this list is more expensive (O(n)) we should try to keep it short. - */ - private static final Collection WHITELISTED_EXECUTORS_PREFIXES; - - static { - final String[] whitelist = { - "java.util.concurrent.AbstractExecutorService", - "java.util.concurrent.ThreadPoolExecutor", - "java.util.concurrent.ScheduledThreadPoolExecutor", - "java.util.concurrent.ForkJoinPool", - "java.util.concurrent.Executors$FinalizableDelegatedExecutorService", - "java.util.concurrent.Executors$DelegatedExecutorService", - "javax.management.NotificationBroadcasterSupport$1", - "kotlinx.coroutines.scheduling.CoroutineScheduler", - "scala.concurrent.Future$InternalCallbackExecutor$", - "scala.concurrent.impl.ExecutionContextImpl", - "scala.concurrent.impl.ExecutionContextImpl$$anon$1", - "scala.concurrent.forkjoin.ForkJoinPool", - "scala.concurrent.impl.ExecutionContextImpl$$anon$3", - "akka.dispatch.MessageDispatcher", - "akka.dispatch.Dispatcher", - "akka.dispatch.Dispatcher$LazyExecutorServiceDelegate", - "akka.actor.ActorSystemImpl$$anon$1", - "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool", - "akka.dispatch.forkjoin.ForkJoinPool", - "akka.dispatch.BalancingDispatcher", - "akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1", - "akka.dispatch.PinnedDispatcher", - "akka.dispatch.ExecutionContexts$sameThreadExecutionContext$", - "play.api.libs.streams.Execution$trampoline$", - "io.netty.channel.MultithreadEventLoopGroup", - "io.netty.util.concurrent.MultithreadEventExecutorGroup", - "io.netty.util.concurrent.AbstractEventExecutorGroup", - "io.netty.channel.epoll.EpollEventLoopGroup", - "io.netty.channel.nio.NioEventLoopGroup", - "io.netty.util.concurrent.GlobalEventExecutor", - "io.netty.util.concurrent.AbstractScheduledEventExecutor", - "io.netty.util.concurrent.AbstractEventExecutor", - "io.netty.util.concurrent.SingleThreadEventExecutor", - "io.netty.channel.nio.NioEventLoop", - "io.netty.channel.SingleThreadEventLoop", - }; - WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); - - final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"}; - WHITELISTED_EXECUTORS_PREFIXES = - Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes)); - } - - public ExecutorInstrumentation() { - super(EXEC_NAME); - } - - @Override - public ElementMatcher typeMatcher() { - return not(isInterface()) - .and(safeHasSuperType(named(Executor.class.getName()))) - .and( - new ElementMatcher() { - @Override - public boolean matches(final TypeDescription target) { - boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); - - // Check for possible prefixes match only if not whitelisted already - if (!whitelisted) { - for (final String name : WHITELISTED_EXECUTORS_PREFIXES) { - if (target.getName().startsWith(name)) { - whitelisted = true; - break; - } - } - } - - if (!whitelisted) { - log.debug("Skipping executor instrumentation for {}", target.getName()); - } - return whitelisted; - } - }); - } - - @Override - public String[] helperClassNames() { - return new String[] { - ExecutorInstrumentation.class.getName() + "$ConcurrentUtils", - }; - } +public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumentation { @Override public Map contextStore() { @@ -154,7 +50,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { SetExecuteRunnableStateAdvice.class.getName()); transformers.put( named("execute").and(takesArgument(0, ForkJoinTask.class)), - SetExecuteForkJoinStateAdvice.class.getName()); + SetJavaForkJoinStateAdvice.class.getName()); transformers.put( named("submit").and(takesArgument(0, Runnable.class)), SetSubmitRunnableStateAdvice.class.getName()); @@ -163,13 +59,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { SetCallableStateAdvice.class.getName()); transformers.put( named("submit").and(takesArgument(0, ForkJoinTask.class)), - SetExecuteForkJoinStateAdvice.class.getName()); + SetJavaForkJoinStateAdvice.class.getName()); transformers.put( nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)), SetCallableStateForCallableCollectionAdvice.class.getName()); transformers.put( nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)), - SetExecuteForkJoinStateAdvice.class.getName()); + SetJavaForkJoinStateAdvice.class.getName()); transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler named("dispatch") .and(takesArgument(0, Runnable.class)) @@ -185,11 +81,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) { + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { task = RunnableWrapper.wrapIfNeeded(task); final ContextStore contextStore = InstrumentationContext.get(Runnable.class, State.class); - return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); } return null; } @@ -199,21 +95,21 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.This final Executor executor, @Advice.Enter final State state, @Advice.Thrown final Throwable throwable) { - ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable); + ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable); } } - public static class SetExecuteForkJoinStateAdvice { + public static class SetJavaForkJoinStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static State enterJobSubmit( @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) { + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { final ContextStore contextStore = InstrumentationContext.get(ForkJoinTask.class, State.class); - return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); } return null; } @@ -223,7 +119,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.This final Executor executor, @Advice.Enter final State state, @Advice.Thrown final Throwable throwable) { - ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable); + ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable); } } @@ -234,11 +130,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) { + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { task = RunnableWrapper.wrapIfNeeded(task); final ContextStore contextStore = InstrumentationContext.get(Runnable.class, State.class); - return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); } return null; } @@ -254,7 +150,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { InstrumentationContext.get(Future.class, State.class); contextStore.put(future, state); } - ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable); + ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable); } } @@ -265,11 +161,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { @Advice.This final Executor executor, @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) { + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { task = CallableWrapper.wrapIfNeeded(task); final ContextStore contextStore = InstrumentationContext.get(Callable.class, State.class); - return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope); + return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); } return null; } @@ -285,7 +181,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { InstrumentationContext.get(Future.class, State.class); contextStore.put(future, state); } - ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable); + ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable); } } @@ -306,7 +202,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { wrappedTasks.add(task); final ContextStore contextStore = InstrumentationContext.get(Callable.class, State.class); - ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope); + ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); } } tasks = wrappedTasks; @@ -351,79 +247,4 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { } } } - - /** Utils for concurrent instrumentations. */ - @Slf4j - public static class ConcurrentUtils { - - private static final WeakMap DISABLED_EXECUTORS = - WeakMap.Provider.newWeakMap(); - - /** - * Checks if given task should get state attached. - * - * @param task task object - * @param executor executor this task was scheduled on - * @return true iff given task object should be wrapped - */ - public static boolean shouldAttachStateToTask(final Object task, final Executor executor) { - final Scope scope = GlobalTracer.get().scopeManager().active(); - return (scope instanceof TraceScope - && ((TraceScope) scope).isAsyncPropagating() - && task != null - && !ConcurrentUtils.isDisabled(executor)); - } - - /** - * Create task state given current scope. - * - * @param contextStore context storage - * @param task task instance - * @param scope current scope - * @param task class type - * @return new state - */ - public static State setupState( - final ContextStore contextStore, final T task, final TraceScope scope) { - final State state = contextStore.putIfAbsent(task, State.FACTORY); - final TraceScope.Continuation continuation = scope.capture(); - if (state.setContinuation(continuation)) { - log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state); - } else { - continuation.close(false); - } - return state; - } - - /** - * Clean up after job submission method has exited. - * - * @param executor the current executor - * @param state task instrumentation state - * @param throwable throwable that may have been thrown - */ - public static void cleanUpOnMethodExit( - final Executor executor, final State state, final Throwable throwable) { - if (null != state && null != throwable) { - /* - Note: this may potentially close somebody else's continuation if we didn't set it - up in setupState because it was already present before us. This should be safe but - may lead to non-attributed async work in some very rare cases. - Alternative is to not close continuation here if we did not set it up in setupState - but this may potentially lead to memory leaks if callers do not properly handle - exceptions. - */ - state.closeContinuation(); - } - } - - public static void disableExecutor(final Executor executor) { - log.debug("Disabling Executor tracing for instance {}", executor); - DISABLED_EXECUTORS.put(executor, true); - } - - public static boolean isDisabled(final Executor executor) { - return DISABLED_EXECUTORS.containsKey(executor); - } - } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ForkJoinTaskInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaForkJoinTaskInstrumentation.java similarity index 91% rename from dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ForkJoinTaskInstrumentation.java rename to dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaForkJoinTaskInstrumentation.java index 99f648671d..871dd79271 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ForkJoinTaskInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/JavaForkJoinTaskInstrumentation.java @@ -29,16 +29,14 @@ import net.bytebuddy.matcher.ElementMatcher; * Instrument {@link ForkJoinTask}. * *

Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code - * ForkJoinPool}: JVM, akka, scala, netty to name a few. For now we only deal with JVM one because - * there are known cases when JVM {@code ForkJoinTask} is supplied as {@code Runnable} into {@code - * ForkJoinPool}. + * ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles JVM version. */ @Slf4j @AutoService(Instrumenter.class) -public final class ForkJoinTaskInstrumentation extends Instrumenter.Default { +public final class JavaForkJoinTaskInstrumentation extends Instrumenter.Default { - public ForkJoinTaskInstrumentation() { - super(ExecutorInstrumentation.EXEC_NAME); + public JavaForkJoinTaskInstrumentation() { + super(AbstractExecutorInstrumentation.EXEC_NAME); } @Override diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RunnableCallableInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RunnableCallableInstrumentation.java index 2611bdb0bf..b722078463 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RunnableCallableInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RunnableCallableInstrumentation.java @@ -29,7 +29,7 @@ import net.bytebuddy.matcher.ElementMatcher; public final class RunnableCallableInstrumentation extends Instrumenter.Default { public RunnableCallableInstrumentation() { - super(ExecutorInstrumentation.EXEC_NAME); + super(AbstractExecutorInstrumentation.EXEC_NAME); } @Override diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ScalaExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ScalaExecutorInstrumentation.java new file mode 100644 index 0000000000..c6bc0411c3 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ScalaExecutorInstrumentation.java @@ -0,0 +1,80 @@ +package datadog.trace.instrumentation.java.concurrent; + +import static net.bytebuddy.matcher.ElementMatchers.nameMatches; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import scala.concurrent.forkjoin.ForkJoinTask; + +@Slf4j +@AutoService(Instrumenter.class) +public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrumentation { + + public ScalaExecutorInstrumentation() { + super(EXEC_NAME + ".scala_fork_join"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap( + ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName()); + } + + @Override + public Map, String> transformers() { + final Map, String> transformers = new HashMap<>(); + transformers.put( + named("execute") + .and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))), + SetScalaForkJoinStateAdvice.class.getName()); + transformers.put( + named("submit") + .and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))), + SetScalaForkJoinStateAdvice.class.getName()); + transformers.put( + nameMatches("invoke") + .and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))), + SetScalaForkJoinStateAdvice.class.getName()); + return transformers; + } + + public static class SetScalaForkJoinStateAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static State enterJobSubmit( + @Advice.This final Executor executor, + @Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) { + final Scope scope = GlobalTracer.get().scopeManager().active(); + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) { + final ContextStore contextStore = + InstrumentationContext.get(ForkJoinTask.class, State.class); + return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope); + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exitJobSubmit( + @Advice.This final Executor executor, + @Advice.Enter final State state, + @Advice.Thrown final Throwable throwable) { + ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable); + } + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ScalaForkJoinTaskInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ScalaForkJoinTaskInstrumentation.java new file mode 100644 index 0000000000..b242c7d13c --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ScalaForkJoinTaskInstrumentation.java @@ -0,0 +1,121 @@ +package datadog.trace.instrumentation.java.concurrent; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isAbstract; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import datadog.trace.context.TraceScope; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import scala.concurrent.forkjoin.ForkJoinPool; +import scala.concurrent.forkjoin.ForkJoinTask; + +/** + * Instrument {@link ForkJoinTask}. + * + *

Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code + * ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles Scala version. + */ +@Slf4j +@AutoService(Instrumenter.class) +public final class ScalaForkJoinTaskInstrumentation extends Instrumenter.Default { + + static final String TASK_CLASS_NAME = "scala.concurrent.forkjoin.ForkJoinTask"; + + public ScalaForkJoinTaskInstrumentation() { + super(AbstractExecutorInstrumentation.EXEC_NAME); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()).and(safeHasSuperType(named(TASK_CLASS_NAME))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + AdviceUtils.class.getName(), + }; + } + + @Override + public Map contextStore() { + final Map map = new HashMap<>(); + map.put(Runnable.class.getName(), State.class.getName()); + map.put(Callable.class.getName(), State.class.getName()); + map.put(TASK_CLASS_NAME, State.class.getName()); + return Collections.unmodifiableMap(map); + } + + @Override + public Map, String> transformers() { + final Map, String> transformers = new HashMap<>(); + transformers.put( + named("exec").and(takesArguments(0)).and(not(isAbstract())), + ForkJoinTaskAdvice.class.getName()); + return transformers; + } + + public static class ForkJoinTaskAdvice { + + /** + * When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or + * {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask} + * directly. This means state is still stored in {@code Runnable} or {@code Callable} and we + * need to use that state. + */ + @Advice.OnMethodEnter(suppress = Throwable.class) + public static TraceScope enter(@Advice.This final ForkJoinTask thiz) { + final ContextStore contextStore = + InstrumentationContext.get(ForkJoinTask.class, State.class); + TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz); + if (thiz instanceof Runnable) { + final ContextStore runnableContextStore = + InstrumentationContext.get(Runnable.class, State.class); + final TraceScope newScope = + AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz); + if (null != newScope) { + if (null != scope) { + newScope.close(); + } else { + scope = newScope; + } + } + } + if (thiz instanceof Callable) { + final ContextStore callableContextStore = + InstrumentationContext.get(Callable.class, State.class); + final TraceScope newScope = + AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz); + if (null != newScope) { + if (null != scope) { + newScope.close(); + } else { + scope = newScope; + } + } + } + return scope; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exit(@Advice.Enter final TraceScope scope) { + AdviceUtils.endTaskScope(scope); + } + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java index 5c58151a97..0ce35dee03 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ThreadPoolExecutorInstrumentation.java @@ -27,7 +27,7 @@ import net.bytebuddy.matcher.ElementMatcher; public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { public ThreadPoolExecutorInstrumentation() { - super(ExecutorInstrumentation.EXEC_NAME); + super(AbstractExecutorInstrumentation.EXEC_NAME); } @Override @@ -38,7 +38,8 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { return new String[] { - ExecutorInstrumentation.class.getName() + "$ConcurrentUtils", + ThreadPoolExecutorInstrumentation.class.getPackage().getName() + + ".ExecutorInstrumentationUtils", ThreadPoolExecutorInstrumentation.class.getName() + "$GenericRunnable", }; } @@ -56,7 +57,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { @Advice.OnMethodExit(suppress = Throwable.class) public static void disableIfQueueWrongType( @Advice.This final ThreadPoolExecutor executor, - @Advice.Argument(4) final BlockingQueue queue) { + @Advice.Argument(4) final BlockingQueue queue) { if (queue.size() == 0) { try { @@ -65,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default { } catch (final ClassCastException | IllegalArgumentException e) { // These errors indicate the queue is fundamentally incompatible with wrapped runnables. // We must disable the executor instance to avoid passing wrapped runnables later. - ExecutorInstrumentation.ConcurrentUtils.disableExecutor(executor); + ExecutorInstrumentationUtils.disableExecutor(executor); } catch (final Exception e) { // Other errors might indicate the queue is not fully initialized. // We might want to disable for those too, but for now just ignore. diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 596810ee49..a99f774453 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -54,9 +54,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner { void run() { ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span - m.invoke(pool, new AsyncChild()) + m.invoke(pool, new JavaAsyncChild()) // this child won't - m.invoke(pool, new AsyncChild(false, false)) + m.invoke(pool, new JavaAsyncChild(false, false)) } }.run() @@ -82,9 +82,11 @@ class ExecutorInstrumentationTest extends AgentTestRunner { new ForkJoinPool() | submitCallableMethod new ForkJoinPool() | submitForkJoinTaskMethod new ForkJoinPool() | invokeForkJoinTaskMethod + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | executeRunnableMethod new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod + } // more useful name breaks java9 javac @@ -93,7 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { setup: def pool = poolImpl def m = method - List children = new ArrayList<>() + List children = new ArrayList<>() List jobFutures = new ArrayList<>() new Runnable() { @@ -109,10 +111,10 @@ class ExecutorInstrumentationTest extends AgentTestRunner { // we do not really have a good way for attributing work to correct parent span // if we reuse Callable/Runnable. // Solution for now is to never reuse a Callable/Runnable. - final AsyncChild child = new AsyncChild(true, true) + final JavaAsyncChild child = new JavaAsyncChild(true, true) children.add(child) try { - Future f = m.invoke(pool, new AsyncChild()) + Future f = m.invoke(pool, new JavaAsyncChild()) jobFutures.add(f) } catch (InvocationTargetException e) { throw e.getCause() @@ -124,7 +126,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { for (Future f : jobFutures) { f.cancel(false) } - for (AsyncChild child : children) { + for (JavaAsyncChild child : children) { child.unblock() } } @@ -140,6 +142,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { poolImpl | method new ForkJoinPool() | submitRunnableMethod new ForkJoinPool() | submitCallableMethod + new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitRunnableMethod new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | submitCallableMethod } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java b/dd-java-agent/instrumentation/java-concurrent/src/test/java/JavaAsyncChild.java similarity index 84% rename from dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java rename to dd-java-agent/instrumentation/java-concurrent/src/test/java/JavaAsyncChild.java index 8495c60397..67266dcb12 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/java/JavaAsyncChild.java @@ -3,11 +3,11 @@ import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.atomic.AtomicBoolean; -public class AsyncChild extends ForkJoinTask implements Runnable, Callable { +public class JavaAsyncChild extends ForkJoinTask implements Runnable, Callable { private final AtomicBoolean blockThread; private final boolean doTraceableWork; - public AsyncChild() { + public JavaAsyncChild() { this(true, false); } @@ -25,7 +25,7 @@ public class AsyncChild extends ForkJoinTask implements Runnable, Callable { return true; } - public AsyncChild(final boolean doTraceableWork, final boolean blockThread) { + public JavaAsyncChild(final boolean doTraceableWork, final boolean blockThread) { this.doTraceableWork = doTraceableWork; this.blockThread = new AtomicBoolean(blockThread); } diff --git a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy index 84ac01406e..0b440e8684 100644 --- a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy +++ b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy @@ -34,7 +34,6 @@ class AgentWriterTest extends Specification { isValid() >> true }] def writer = new AgentWriter(client) - writer.start() when: for (def trace : traces) { @@ -43,6 +42,10 @@ class AgentWriterTest extends Specification { incrementTraceCountBy.times { writer.incrementTraceCount() } + + // Starting writer after submissions to make sure all updates go out in 1 request + writer.start() + Thread.sleep(FLUSH_DELAY) then: @@ -98,7 +101,7 @@ class AgentWriterTest extends Specification { Thread.sleep(FLUSH_DELAY) then: - 1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception")} + 1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception") } writer.getSampleRateByService() == SampleRateByService.EMPTY_INSTANCE when: @@ -175,6 +178,6 @@ class AgentWriterTest extends Specification { } boolean isWriterThreadRunning() { - return Thread.getAllStackTraces().keySet().any{ t -> t.getName() == "dd-agent-writer" } + return Thread.getAllStackTraces().keySet().any { t -> t.getName() == "dd-agent-writer" } } } diff --git a/settings.gradle b/settings.gradle index 65308da2df..f5815e126a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -48,6 +48,7 @@ include ':dd-java-agent:instrumentation:java-concurrent' include ':dd-java-agent:instrumentation:java-concurrent:kotlin-testing' include ':dd-java-agent:instrumentation:java-concurrent:scala-testing' include ':dd-java-agent:instrumentation:java-concurrent:akka-testing' +include ':dd-java-agent:instrumentation:java-concurrent:akka-2.5-testing' include ':dd-java-agent:instrumentation:jboss-classloading' include ':dd-java-agent:instrumentation:jdbc' include ':dd-java-agent:instrumentation:jedis-1.4'