Merge pull request #672 from DataDog/mar-kolya/more-forkjoin-instrumentations

More ForkJoin instrumentations
This commit is contained in:
Nikolay Martynov 2019-01-28 14:01:03 -05:00 committed by GitHub
commit ee3788bfdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1106 additions and 221 deletions

View File

@ -0,0 +1,20 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
// Execute tests on all JVMs, even rare and outdated ones
coreJavaInstrumentation = true
}
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle"
dependencies {
testCompile project(':dd-trace-api')
testCompile project(':dd-trace-ot')
testCompile deps.scala
testCompile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.0'
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
}

View File

@ -0,0 +1,149 @@
import akka.dispatch.forkjoin.ForkJoinPool
import akka.dispatch.forkjoin.ForkJoinTask
import datadog.opentracing.DDSpan
import datadog.opentracing.scopemanager.ContinuableScope
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace
import io.opentracing.util.GlobalTracer
import spock.lang.Shared
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
/**
* Test executor instrumentation for Akka specific classes.
* This is to large extent a copy of ExecutorInstrumentationTest.
*/
class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Shared
Method executeRunnableMethod
@Shared
Method akkaExecuteForkJoinTaskMethod
@Shared
Method submitRunnableMethod
@Shared
Method submitCallableMethod
@Shared
Method akkaSubmitForkJoinTaskMethod
@Shared
Method akkaInvokeForkJoinTaskMethod
def setupSpec() {
executeRunnableMethod = Executor.getMethod("execute", Runnable)
akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
}
// more useful name breaks java9 javac
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl #method propagates"() {
setup:
def pool = poolImpl
def m = method
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span
m.invoke(pool, new AkkaAsyncChild())
// this child won't
m.invoke(pool, new AkkaAsyncChild(false, false))
}
}.run()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace.get(0).operationName == "parent"
trace.get(1).operationName == "asyncChild"
trace.get(1).parentId == trace.get(0).spanId
cleanup:
pool?.shutdown()
// Unfortunately, there's no simple way to test the cross product of methods/pools.
where:
poolImpl | method
new ForkJoinPool() | executeRunnableMethod
new ForkJoinPool() | akkaExecuteForkJoinTaskMethod
new ForkJoinPool() | submitRunnableMethod
new ForkJoinPool() | submitCallableMethod
new ForkJoinPool() | akkaSubmitForkJoinTaskMethod
new ForkJoinPool() | akkaInvokeForkJoinTaskMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
}
// more useful name breaks java9 javac
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl reports after canceled jobs"() {
setup:
def pool = poolImpl
def m = method
List<AkkaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
try {
for (int i = 0; i < 20; ++i) {
// Our current instrumentation instrumentation does not behave very well
// if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
// child traces sometimes since state can contain only one continuation - and
// we do not really have a good way for attributing work to correct parent span
// if we reuse Callable/Runnable.
// Solution for now is to never reuse a Callable/Runnable.
final AkkaAsyncChild child = new AkkaAsyncChild(true, true)
children.add(child)
try {
Future f = m.invoke(pool, new AkkaAsyncChild())
jobFutures.add(f)
} catch (InvocationTargetException e) {
throw e.getCause()
}
}
} catch (RejectedExecutionException e) {
}
for (Future f : jobFutures) {
f.cancel(false)
}
for (AkkaAsyncChild child : children) {
child.unblock()
}
}
}.run()
TEST_WRITER.waitForTraces(1)
expect:
// FIXME: we should improve this test to make sure continuations are actually closed
TEST_WRITER.size() == 1
where:
poolImpl | method
new ForkJoinPool() | submitRunnableMethod
new ForkJoinPool() | submitCallableMethod
}
}

View File

@ -0,0 +1,59 @@
import akka.dispatch.forkjoin.ForkJoinTask;
import datadog.trace.api.Trace;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
public class AkkaAsyncChild extends ForkJoinTask implements Runnable, Callable {
private final AtomicBoolean blockThread;
private final boolean doTraceableWork;
public AkkaAsyncChild() {
this(true, false);
}
@Override
public Object getRawResult() {
return null;
}
@Override
protected void setRawResult(final Object value) {}
@Override
protected boolean exec() {
runImpl();
return true;
}
public AkkaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
this.doTraceableWork = doTraceableWork;
this.blockThread = new AtomicBoolean(blockThread);
}
public void unblock() {
blockThread.set(false);
}
@Override
public void run() {
runImpl();
}
@Override
public Object call() throws Exception {
runImpl();
return null;
}
private void runImpl() {
while (blockThread.get()) {
// busy-wait to block thread
}
if (doTraceableWork) {
asyncChild();
}
}
@Trace(operationName = "asyncChild")
private void asyncChild() {}
}

View File

@ -1,3 +1,9 @@
// Set properties before any plugins get loaded
ext {
// Execute tests on all JVMs, even rare and outdated ones
coreJavaInstrumentation = true
}
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle"

View File

@ -20,6 +20,11 @@ compileSlickTestGroovy {
}
dependencies {
// This is needed for Scala ForJoinTask/Pool instrumentation
compileOnly deps.scala
// This is needed for Akka ForJoinTask/Pool instrumentation
compileOnly group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.0'
compile project(':dd-trace-api')
compile project(':dd-java-agent:agent-tooling')

View File

@ -1,3 +1,9 @@
// Set properties before any plugins get loaded
project.ext {
// Execute tests on all JVMs, even rare and outdated ones
coreJavaInstrumentation = true
}
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle"

View File

@ -0,0 +1,149 @@
import datadog.opentracing.DDSpan
import datadog.opentracing.scopemanager.ContinuableScope
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace
import io.opentracing.util.GlobalTracer
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.forkjoin.ForkJoinTask
import spock.lang.Shared
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
/**
* Test executor instrumentation for Scala specific classes.
* This is to large extent a copy of ExecutorInstrumentationTest.
*/
class ScalaExecutorInstrumentationTest extends AgentTestRunner {
@Shared
Method executeRunnableMethod
@Shared
Method scalaExecuteForkJoinTaskMethod
@Shared
Method submitRunnableMethod
@Shared
Method submitCallableMethod
@Shared
Method scalaSubmitForkJoinTaskMethod
@Shared
Method scalaInvokeForkJoinTaskMethod
def setupSpec() {
executeRunnableMethod = Executor.getMethod("execute", Runnable)
scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
}
// more useful name breaks java9 javac
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl #method propagates"() {
setup:
def pool = poolImpl
def m = method
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span
m.invoke(pool, new ScalaAsyncChild())
// this child won't
m.invoke(pool, new ScalaAsyncChild(false, false))
}
}.run()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace.get(0).operationName == "parent"
trace.get(1).operationName == "asyncChild"
trace.get(1).parentId == trace.get(0).spanId
cleanup:
pool?.shutdown()
// Unfortunately, there's no simple way to test the cross product of methods/pools.
where:
poolImpl | method
new ForkJoinPool() | executeRunnableMethod
new ForkJoinPool() | scalaExecuteForkJoinTaskMethod
new ForkJoinPool() | submitRunnableMethod
new ForkJoinPool() | submitCallableMethod
new ForkJoinPool() | scalaSubmitForkJoinTaskMethod
new ForkJoinPool() | scalaInvokeForkJoinTaskMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
}
// more useful name breaks java9 javac
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
def "#poolImpl reports after canceled jobs"() {
setup:
def pool = poolImpl
def m = method
List<ScalaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
try {
for (int i = 0; i < 20; ++i) {
// Our current instrumentation instrumentation does not behave very well
// if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
// child traces sometimes since state can contain only one continuation - and
// we do not really have a good way for attributing work to correct parent span
// if we reuse Callable/Runnable.
// Solution for now is to never reuse a Callable/Runnable.
final ScalaAsyncChild child = new ScalaAsyncChild(true, true)
children.add(child)
try {
Future f = m.invoke(pool, new ScalaAsyncChild())
jobFutures.add(f)
} catch (InvocationTargetException e) {
throw e.getCause()
}
}
} catch (RejectedExecutionException e) {
}
for (Future f : jobFutures) {
f.cancel(false)
}
for (ScalaAsyncChild child : children) {
child.unblock()
}
}
}.run()
TEST_WRITER.waitForTraces(1)
expect:
// FIXME: we should improve this test to make sure continuations are actually closed
TEST_WRITER.size() == 1
where:
poolImpl | method
new ForkJoinPool() | submitRunnableMethod
new ForkJoinPool() | submitCallableMethod
}
}

View File

@ -0,0 +1,59 @@
import datadog.trace.api.Trace;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import scala.concurrent.forkjoin.ForkJoinTask;
public class ScalaAsyncChild extends ForkJoinTask implements Runnable, Callable {
private final AtomicBoolean blockThread;
private final boolean doTraceableWork;
public ScalaAsyncChild() {
this(true, false);
}
@Override
public Object getRawResult() {
return null;
}
@Override
protected void setRawResult(final Object value) {}
@Override
protected boolean exec() {
runImpl();
return true;
}
public ScalaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
this.doTraceableWork = doTraceableWork;
this.blockThread = new AtomicBoolean(blockThread);
}
public void unblock() {
blockThread.set(false);
}
@Override
public void run() {
runImpl();
}
@Override
public Object call() throws Exception {
runImpl();
return null;
}
private void runImpl() {
while (blockThread.get()) {
// busy-wait to block thread
}
if (doTraceableWork) {
asyncChild();
}
}
@Trace(operationName = "asyncChild")
private void asyncChild() {}
}

View File

@ -0,0 +1,118 @@
package datadog.trace.instrumentation.java.concurrent;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@Slf4j
public abstract class AbstractExecutorInstrumentation extends Instrumenter.Default {
public static final String EXEC_NAME = "java_concurrent";
/**
* Only apply executor instrumentation to whitelisted executors. In the future, this restriction
* may be lifted to include all executors.
*/
private static final Collection<String> WHITELISTED_EXECUTORS;
/**
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
* checking this list is more expensive (O(n)) we should try to keep it short.
*/
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
static {
final String[] whitelist = {
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.ThreadPoolExecutor",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"java.util.concurrent.ForkJoinPool",
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.Executors$DelegatedExecutorService",
"javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
"scala.concurrent.forkjoin.ForkJoinPool",
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
"akka.dispatch.MessageDispatcher",
"akka.dispatch.Dispatcher",
"akka.dispatch.Dispatcher$LazyExecutorServiceDelegate",
"akka.actor.ActorSystemImpl$$anon$1",
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
"akka.dispatch.forkjoin.ForkJoinPool",
"akka.dispatch.BalancingDispatcher",
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"play.api.libs.streams.Execution$trampoline$",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.nio.NioEventLoopGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
};
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
}
public AbstractExecutorInstrumentation(final String... additionalNames) {
super(EXEC_NAME, additionalNames);
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named(Executor.class.getName())))
.and(
new ElementMatcher<TypeDescription>() {
@Override
public boolean matches(final TypeDescription target) {
boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
// Check for possible prefixes match only if not whitelisted already
if (!whitelisted) {
for (final String name : WHITELISTED_EXECUTORS_PREFIXES) {
if (target.getName().startsWith(name)) {
whitelisted = true;
break;
}
}
}
if (!whitelisted) {
log.debug("Skipping executor instrumentation for {}", target.getName());
}
return whitelisted;
}
});
}
@Override
public String[] helperClassNames() {
return new String[] {
AbstractExecutorInstrumentation.class.getPackage().getName() + ".ExecutorInstrumentationUtils"
};
}
}

View File

@ -0,0 +1,80 @@
package datadog.trace.instrumentation.java.concurrent;
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.dispatch.forkjoin.ForkJoinTask;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@Slf4j
@AutoService(Instrumenter.class)
public final class AkkaExecutorInstrumentation extends AbstractExecutorInstrumentation {
public AkkaExecutorInstrumentation() {
super(EXEC_NAME + ".akka_fork_join");
}
@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName());
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
named("execute")
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
SetAkkaForkJoinStateAdvice.class.getName());
transformers.put(
named("submit")
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
SetAkkaForkJoinStateAdvice.class.getName());
transformers.put(
nameMatches("invoke")
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
SetAkkaForkJoinStateAdvice.class.getName());
return transformers;
}
public static class SetAkkaForkJoinStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
final ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
return null;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitJobSubmit(
@Advice.This final Executor executor,
@Advice.Enter final State state,
@Advice.Thrown final Throwable throwable) {
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
}
}
}

View File

@ -0,0 +1,121 @@
package datadog.trace.instrumentation.java.concurrent;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import akka.dispatch.forkjoin.ForkJoinPool;
import akka.dispatch.forkjoin.ForkJoinTask;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
/**
* Instrument {@link ForkJoinTask}.
*
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
* ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles Akka version.
*/
@Slf4j
@AutoService(Instrumenter.class)
public final class AkkaForkJoinTaskInstrumentation extends Instrumenter.Default {
static final String TASK_CLASS_NAME = "akka.dispatch.forkjoin.ForkJoinTask";
public AkkaForkJoinTaskInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME);
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(safeHasSuperType(named(TASK_CLASS_NAME)));
}
@Override
public String[] helperClassNames() {
return new String[] {
AdviceUtils.class.getName(),
};
}
@Override
public Map<String, String> contextStore() {
final Map<String, String> map = new HashMap<>();
map.put(Runnable.class.getName(), State.class.getName());
map.put(Callable.class.getName(), State.class.getName());
map.put(TASK_CLASS_NAME, State.class.getName());
return Collections.unmodifiableMap(map);
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
named("exec").and(takesArguments(0)).and(not(isAbstract())),
ForkJoinTaskAdvice.class.getName());
return transformers;
}
public static class ForkJoinTaskAdvice {
/**
* When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or
* {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask}
* directly. This means state is still stored in {@code Runnable} or {@code Callable} and we
* need to use that state.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static TraceScope enter(@Advice.This final ForkJoinTask thiz) {
final ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz);
if (thiz instanceof Runnable) {
final ContextStore<Runnable, State> runnableContextStore =
InstrumentationContext.get(Runnable.class, State.class);
final TraceScope newScope =
AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz);
if (null != newScope) {
if (null != scope) {
newScope.close();
} else {
scope = newScope;
}
}
}
if (thiz instanceof Callable) {
final ContextStore<Callable, State> callableContextStore =
InstrumentationContext.get(Callable.class, State.class);
final TraceScope newScope =
AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz);
if (null != newScope) {
if (null != scope) {
newScope.close();
} else {
scope = newScope;
}
}
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exit(@Advice.Enter final TraceScope scope) {
AdviceUtils.endTaskScope(scope);
}
}
}

View File

@ -0,0 +1,85 @@
package datadog.trace.instrumentation.java.concurrent;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.WeakMap;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
/** Utils for concurrent instrumentations. */
@Slf4j
public class ExecutorInstrumentationUtils {
private static final WeakMap<Executor, Boolean> DISABLED_EXECUTORS =
WeakMap.Provider.newWeakMap();
/**
* Checks if given task should get state attached.
*
* @param task task object
* @param executor executor this task was scheduled on
* @return true iff given task object should be wrapped
*/
public static boolean shouldAttachStateToTask(final Object task, final Executor executor) {
final Scope scope = GlobalTracer.get().scopeManager().active();
return (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !ExecutorInstrumentationUtils.isDisabled(executor));
}
/**
* Create task state given current scope.
*
* @param contextStore context storage
* @param task task instance
* @param scope current scope
* @param <T> task class type
* @return new state
*/
public static <T> State setupState(
final ContextStore<T, State> contextStore, final T task, final TraceScope scope) {
final State state = contextStore.putIfAbsent(task, State.FACTORY);
final TraceScope.Continuation continuation = scope.capture();
if (state.setContinuation(continuation)) {
log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state);
} else {
continuation.close(false);
}
return state;
}
/**
* Clean up after job submission method has exited.
*
* @param executor the current executor
* @param state task instrumentation state
* @param throwable throwable that may have been thrown
*/
public static void cleanUpOnMethodExit(
final Executor executor, final State state, final Throwable throwable) {
if (null != state && null != throwable) {
/*
Note: this may potentially close somebody else's continuation if we didn't set it
up in setupState because it was already present before us. This should be safe but
may lead to non-attributed async work in some very rare cases.
Alternative is to not close continuation here if we did not set it up in setupState
but this may potentially lead to memory leaks if callers do not properly handle
exceptions.
*/
state.closeContinuation();
}
}
public static void disableExecutor(final Executor executor) {
log.debug("Disabling Executor tracing for instance {}", executor);
DISABLED_EXECUTORS.put(executor, true);
}
public static boolean isDisabled(final Executor executor) {
return DISABLED_EXECUTORS.containsKey(executor);
}
}

View File

@ -67,7 +67,7 @@ public final class FutureInstrumentation extends Instrumenter.Default {
}
public FutureInstrumentation() {
super(ExecutorInstrumentation.EXEC_NAME);
super(AbstractExecutorInstrumentation.EXEC_NAME);
}
@Override

View File

@ -1,17 +1,13 @@
package datadog.trace.instrumentation.java.concurrent;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.WeakMap;
import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
@ -19,11 +15,9 @@ import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@ -32,109 +26,11 @@ import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@Slf4j
@AutoService(Instrumenter.class)
public final class ExecutorInstrumentation extends Instrumenter.Default {
public static final String EXEC_NAME = "java_concurrent";
/**
* Only apply executor instrumentation to whitelisted executors. In the future, this restriction
* may be lifted to include all executors.
*/
private static final Collection<String> WHITELISTED_EXECUTORS;
/**
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
* checking this list is more expensive (O(n)) we should try to keep it short.
*/
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
static {
final String[] whitelist = {
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.ThreadPoolExecutor",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"java.util.concurrent.ForkJoinPool",
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.Executors$DelegatedExecutorService",
"javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
"scala.concurrent.forkjoin.ForkJoinPool",
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
"akka.dispatch.MessageDispatcher",
"akka.dispatch.Dispatcher",
"akka.dispatch.Dispatcher$LazyExecutorServiceDelegate",
"akka.actor.ActorSystemImpl$$anon$1",
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
"akka.dispatch.forkjoin.ForkJoinPool",
"akka.dispatch.BalancingDispatcher",
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"play.api.libs.streams.Execution$trampoline$",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.nio.NioEventLoopGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
};
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
}
public ExecutorInstrumentation() {
super(EXEC_NAME);
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named(Executor.class.getName())))
.and(
new ElementMatcher<TypeDescription>() {
@Override
public boolean matches(final TypeDescription target) {
boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
// Check for possible prefixes match only if not whitelisted already
if (!whitelisted) {
for (final String name : WHITELISTED_EXECUTORS_PREFIXES) {
if (target.getName().startsWith(name)) {
whitelisted = true;
break;
}
}
}
if (!whitelisted) {
log.debug("Skipping executor instrumentation for {}", target.getName());
}
return whitelisted;
}
});
}
@Override
public String[] helperClassNames() {
return new String[] {
ExecutorInstrumentation.class.getName() + "$ConcurrentUtils",
};
}
public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumentation {
@Override
public Map<String, String> contextStore() {
@ -154,7 +50,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
SetExecuteRunnableStateAdvice.class.getName());
transformers.put(
named("execute").and(takesArgument(0, ForkJoinTask.class)),
SetExecuteForkJoinStateAdvice.class.getName());
SetJavaForkJoinStateAdvice.class.getName());
transformers.put(
named("submit").and(takesArgument(0, Runnable.class)),
SetSubmitRunnableStateAdvice.class.getName());
@ -163,13 +59,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
SetCallableStateAdvice.class.getName());
transformers.put(
named("submit").and(takesArgument(0, ForkJoinTask.class)),
SetExecuteForkJoinStateAdvice.class.getName());
SetJavaForkJoinStateAdvice.class.getName());
transformers.put(
nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)),
SetCallableStateForCallableCollectionAdvice.class.getName());
transformers.put(
nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)),
SetExecuteForkJoinStateAdvice.class.getName());
SetJavaForkJoinStateAdvice.class.getName());
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
named("dispatch")
.and(takesArgument(0, Runnable.class))
@ -185,11 +81,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
task = RunnableWrapper.wrapIfNeeded(task);
final ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
return null;
}
@ -199,21 +95,21 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.This final Executor executor,
@Advice.Enter final State state,
@Advice.Thrown final Throwable throwable) {
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
}
}
public static class SetExecuteForkJoinStateAdvice {
public static class SetJavaForkJoinStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
final ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
return null;
}
@ -223,7 +119,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.This final Executor executor,
@Advice.Enter final State state,
@Advice.Thrown final Throwable throwable) {
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
}
}
@ -234,11 +130,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
task = RunnableWrapper.wrapIfNeeded(task);
final ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
return null;
}
@ -254,7 +150,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
InstrumentationContext.get(Future.class, State.class);
contextStore.put(future, state);
}
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
}
}
@ -265,11 +161,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Callable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
task = CallableWrapper.wrapIfNeeded(task);
final ContextStore<Callable, State> contextStore =
InstrumentationContext.get(Callable.class, State.class);
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
return null;
}
@ -285,7 +181,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
InstrumentationContext.get(Future.class, State.class);
contextStore.put(future, state);
}
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
}
}
@ -306,7 +202,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
wrappedTasks.add(task);
final ContextStore<Callable, State> contextStore =
InstrumentationContext.get(Callable.class, State.class);
ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
}
tasks = wrappedTasks;
@ -351,79 +247,4 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
}
}
}
/** Utils for concurrent instrumentations. */
@Slf4j
public static class ConcurrentUtils {
private static final WeakMap<Executor, Boolean> DISABLED_EXECUTORS =
WeakMap.Provider.newWeakMap();
/**
* Checks if given task should get state attached.
*
* @param task task object
* @param executor executor this task was scheduled on
* @return true iff given task object should be wrapped
*/
public static boolean shouldAttachStateToTask(final Object task, final Executor executor) {
final Scope scope = GlobalTracer.get().scopeManager().active();
return (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !ConcurrentUtils.isDisabled(executor));
}
/**
* Create task state given current scope.
*
* @param contextStore context storage
* @param task task instance
* @param scope current scope
* @param <T> task class type
* @return new state
*/
public static <T> State setupState(
final ContextStore<T, State> contextStore, final T task, final TraceScope scope) {
final State state = contextStore.putIfAbsent(task, State.FACTORY);
final TraceScope.Continuation continuation = scope.capture();
if (state.setContinuation(continuation)) {
log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state);
} else {
continuation.close(false);
}
return state;
}
/**
* Clean up after job submission method has exited.
*
* @param executor the current executor
* @param state task instrumentation state
* @param throwable throwable that may have been thrown
*/
public static void cleanUpOnMethodExit(
final Executor executor, final State state, final Throwable throwable) {
if (null != state && null != throwable) {
/*
Note: this may potentially close somebody else's continuation if we didn't set it
up in setupState because it was already present before us. This should be safe but
may lead to non-attributed async work in some very rare cases.
Alternative is to not close continuation here if we did not set it up in setupState
but this may potentially lead to memory leaks if callers do not properly handle
exceptions.
*/
state.closeContinuation();
}
}
public static void disableExecutor(final Executor executor) {
log.debug("Disabling Executor tracing for instance {}", executor);
DISABLED_EXECUTORS.put(executor, true);
}
public static boolean isDisabled(final Executor executor) {
return DISABLED_EXECUTORS.containsKey(executor);
}
}
}

View File

@ -29,16 +29,14 @@ import net.bytebuddy.matcher.ElementMatcher;
* Instrument {@link ForkJoinTask}.
*
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
* ForkJoinPool}: JVM, akka, scala, netty to name a few. For now we only deal with JVM one because
* there are known cases when JVM {@code ForkJoinTask} is supplied as {@code Runnable} into {@code
* ForkJoinPool}.
* ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles JVM version.
*/
@Slf4j
@AutoService(Instrumenter.class)
public final class ForkJoinTaskInstrumentation extends Instrumenter.Default {
public final class JavaForkJoinTaskInstrumentation extends Instrumenter.Default {
public ForkJoinTaskInstrumentation() {
super(ExecutorInstrumentation.EXEC_NAME);
public JavaForkJoinTaskInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME);
}
@Override

View File

@ -29,7 +29,7 @@ import net.bytebuddy.matcher.ElementMatcher;
public final class RunnableCallableInstrumentation extends Instrumenter.Default {
public RunnableCallableInstrumentation() {
super(ExecutorInstrumentation.EXEC_NAME);
super(AbstractExecutorInstrumentation.EXEC_NAME);
}
@Override

View File

@ -0,0 +1,80 @@
package datadog.trace.instrumentation.java.concurrent;
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.concurrent.forkjoin.ForkJoinTask;
@Slf4j
@AutoService(Instrumenter.class)
public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrumentation {
public ScalaExecutorInstrumentation() {
super(EXEC_NAME + ".scala_fork_join");
}
@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName());
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
named("execute")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
SetScalaForkJoinStateAdvice.class.getName());
transformers.put(
named("submit")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
SetScalaForkJoinStateAdvice.class.getName());
transformers.put(
nameMatches("invoke")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
SetScalaForkJoinStateAdvice.class.getName());
return transformers;
}
public static class SetScalaForkJoinStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
final ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
}
return null;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitJobSubmit(
@Advice.This final Executor executor,
@Advice.Enter final State state,
@Advice.Thrown final Throwable throwable) {
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
}
}
}

View File

@ -0,0 +1,121 @@
package datadog.trace.instrumentation.java.concurrent;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.concurrent.forkjoin.ForkJoinPool;
import scala.concurrent.forkjoin.ForkJoinTask;
/**
* Instrument {@link ForkJoinTask}.
*
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
* ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles Scala version.
*/
@Slf4j
@AutoService(Instrumenter.class)
public final class ScalaForkJoinTaskInstrumentation extends Instrumenter.Default {
static final String TASK_CLASS_NAME = "scala.concurrent.forkjoin.ForkJoinTask";
public ScalaForkJoinTaskInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME);
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(safeHasSuperType(named(TASK_CLASS_NAME)));
}
@Override
public String[] helperClassNames() {
return new String[] {
AdviceUtils.class.getName(),
};
}
@Override
public Map<String, String> contextStore() {
final Map<String, String> map = new HashMap<>();
map.put(Runnable.class.getName(), State.class.getName());
map.put(Callable.class.getName(), State.class.getName());
map.put(TASK_CLASS_NAME, State.class.getName());
return Collections.unmodifiableMap(map);
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
named("exec").and(takesArguments(0)).and(not(isAbstract())),
ForkJoinTaskAdvice.class.getName());
return transformers;
}
public static class ForkJoinTaskAdvice {
/**
* When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or
* {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask}
* directly. This means state is still stored in {@code Runnable} or {@code Callable} and we
* need to use that state.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static TraceScope enter(@Advice.This final ForkJoinTask thiz) {
final ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz);
if (thiz instanceof Runnable) {
final ContextStore<Runnable, State> runnableContextStore =
InstrumentationContext.get(Runnable.class, State.class);
final TraceScope newScope =
AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz);
if (null != newScope) {
if (null != scope) {
newScope.close();
} else {
scope = newScope;
}
}
}
if (thiz instanceof Callable) {
final ContextStore<Callable, State> callableContextStore =
InstrumentationContext.get(Callable.class, State.class);
final TraceScope newScope =
AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz);
if (null != newScope) {
if (null != scope) {
newScope.close();
} else {
scope = newScope;
}
}
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exit(@Advice.Enter final TraceScope scope) {
AdviceUtils.endTaskScope(scope);
}
}
}

View File

@ -27,7 +27,7 @@ import net.bytebuddy.matcher.ElementMatcher;
public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
public ThreadPoolExecutorInstrumentation() {
super(ExecutorInstrumentation.EXEC_NAME);
super(AbstractExecutorInstrumentation.EXEC_NAME);
}
@Override
@ -38,7 +38,8 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
ExecutorInstrumentation.class.getName() + "$ConcurrentUtils",
ThreadPoolExecutorInstrumentation.class.getPackage().getName()
+ ".ExecutorInstrumentationUtils",
ThreadPoolExecutorInstrumentation.class.getName() + "$GenericRunnable",
};
}
@ -56,7 +57,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void disableIfQueueWrongType(
@Advice.This final ThreadPoolExecutor executor,
@Advice.Argument(4) final BlockingQueue queue) {
@Advice.Argument(4) final BlockingQueue<Runnable> queue) {
if (queue.size() == 0) {
try {
@ -65,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
} catch (final ClassCastException | IllegalArgumentException e) {
// These errors indicate the queue is fundamentally incompatible with wrapped runnables.
// We must disable the executor instance to avoid passing wrapped runnables later.
ExecutorInstrumentation.ConcurrentUtils.disableExecutor(executor);
ExecutorInstrumentationUtils.disableExecutor(executor);
} catch (final Exception e) {
// Other errors might indicate the queue is not fully initialized.
// We might want to disable for those too, but for now just ignore.

View File

@ -54,9 +54,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span
m.invoke(pool, new AsyncChild())
m.invoke(pool, new JavaAsyncChild())
// this child won't
m.invoke(pool, new AsyncChild(false, false))
m.invoke(pool, new JavaAsyncChild(false, false))
}
}.run()
@ -82,9 +82,11 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
new ForkJoinPool() | submitCallableMethod
new ForkJoinPool() | submitForkJoinTaskMethod
new ForkJoinPool() | invokeForkJoinTaskMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
}
// more useful name breaks java9 javac
@ -93,7 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
setup:
def pool = poolImpl
def m = method
List<AsyncChild> children = new ArrayList<>()
List<JavaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
new Runnable() {
@ -109,10 +111,10 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
// we do not really have a good way for attributing work to correct parent span
// if we reuse Callable/Runnable.
// Solution for now is to never reuse a Callable/Runnable.
final AsyncChild child = new AsyncChild(true, true)
final JavaAsyncChild child = new JavaAsyncChild(true, true)
children.add(child)
try {
Future f = m.invoke(pool, new AsyncChild())
Future f = m.invoke(pool, new JavaAsyncChild())
jobFutures.add(f)
} catch (InvocationTargetException e) {
throw e.getCause()
@ -124,7 +126,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
for (Future f : jobFutures) {
f.cancel(false)
}
for (AsyncChild child : children) {
for (JavaAsyncChild child : children) {
child.unblock()
}
}
@ -140,6 +142,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
poolImpl | method
new ForkJoinPool() | submitRunnableMethod
new ForkJoinPool() | submitCallableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
}

View File

@ -3,11 +3,11 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncChild extends ForkJoinTask implements Runnable, Callable {
public class JavaAsyncChild extends ForkJoinTask implements Runnable, Callable {
private final AtomicBoolean blockThread;
private final boolean doTraceableWork;
public AsyncChild() {
public JavaAsyncChild() {
this(true, false);
}
@ -25,7 +25,7 @@ public class AsyncChild extends ForkJoinTask implements Runnable, Callable {
return true;
}
public AsyncChild(final boolean doTraceableWork, final boolean blockThread) {
public JavaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
this.doTraceableWork = doTraceableWork;
this.blockThread = new AtomicBoolean(blockThread);
}

View File

@ -34,7 +34,6 @@ class AgentWriterTest extends Specification {
isValid() >> true
}]
def writer = new AgentWriter(client)
writer.start()
when:
for (def trace : traces) {
@ -43,6 +42,10 @@ class AgentWriterTest extends Specification {
incrementTraceCountBy.times {
writer.incrementTraceCount()
}
// Starting writer after submissions to make sure all updates go out in 1 request
writer.start()
Thread.sleep(FLUSH_DELAY)
then:
@ -98,7 +101,7 @@ class AgentWriterTest extends Specification {
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception")}
1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception") }
writer.getSampleRateByService() == SampleRateByService.EMPTY_INSTANCE
when:
@ -175,6 +178,6 @@ class AgentWriterTest extends Specification {
}
boolean isWriterThreadRunning() {
return Thread.getAllStackTraces().keySet().any{ t -> t.getName() == "dd-agent-writer" }
return Thread.getAllStackTraces().keySet().any { t -> t.getName() == "dd-agent-writer" }
}
}

View File

@ -48,6 +48,7 @@ include ':dd-java-agent:instrumentation:java-concurrent'
include ':dd-java-agent:instrumentation:java-concurrent:kotlin-testing'
include ':dd-java-agent:instrumentation:java-concurrent:scala-testing'
include ':dd-java-agent:instrumentation:java-concurrent:akka-testing'
include ':dd-java-agent:instrumentation:java-concurrent:akka-2.5-testing'
include ':dd-java-agent:instrumentation:jboss-classloading'
include ':dd-java-agent:instrumentation:jdbc'
include ':dd-java-agent:instrumentation:jedis-1.4'