Instrument Akka and Scala ForkJoinTask and ForkJoinPool
This commit is contained in:
parent
633fddc46b
commit
f7844f763c
|
@ -0,0 +1,20 @@
|
|||
// Set properties before any plugins get loaded
|
||||
ext {
|
||||
minJavaVersionForTests = JavaVersion.VERSION_1_8
|
||||
// Execute tests on all JVMs, even rare and outdated ones
|
||||
coreJavaInstrumentation = true
|
||||
}
|
||||
|
||||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
apply from: "${rootDir}/gradle/test-with-scala.gradle"
|
||||
|
||||
dependencies {
|
||||
testCompile project(':dd-trace-api')
|
||||
testCompile project(':dd-trace-ot')
|
||||
testCompile deps.scala
|
||||
testCompile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.0'
|
||||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
|
||||
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
import akka.dispatch.forkjoin.ForkJoinPool
|
||||
import akka.dispatch.forkjoin.ForkJoinTask
|
||||
import datadog.opentracing.DDSpan
|
||||
import datadog.opentracing.scopemanager.ContinuableScope
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.Trace
|
||||
import io.opentracing.util.GlobalTracer
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Test executor instrumentation for Akka specific classes.
|
||||
* This is to large extent a copy of ExecutorInstrumentationTest.
|
||||
*/
|
||||
class AkkaExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method executeRunnableMethod
|
||||
@Shared
|
||||
Method akkaExecuteForkJoinTaskMethod
|
||||
@Shared
|
||||
Method submitRunnableMethod
|
||||
@Shared
|
||||
Method submitCallableMethod
|
||||
@Shared
|
||||
Method akkaSubmitForkJoinTaskMethod
|
||||
@Shared
|
||||
Method akkaInvokeForkJoinTaskMethod
|
||||
|
||||
def setupSpec() {
|
||||
executeRunnableMethod = Executor.getMethod("execute", Runnable)
|
||||
akkaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
|
||||
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
|
||||
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
|
||||
akkaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
|
||||
akkaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl #method propagates"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
||||
new Runnable() {
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new AkkaAsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new AkkaAsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == 2
|
||||
trace.get(0).operationName == "parent"
|
||||
trace.get(1).operationName == "asyncChild"
|
||||
trace.get(1).parentId == trace.get(0).spanId
|
||||
|
||||
cleanup:
|
||||
pool?.shutdown()
|
||||
|
||||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | executeRunnableMethod
|
||||
new ForkJoinPool() | akkaExecuteForkJoinTaskMethod
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | akkaSubmitForkJoinTaskMethod
|
||||
new ForkJoinPool() | akkaInvokeForkJoinTaskMethod
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl reports after canceled jobs"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
List<AkkaAsyncChild> children = new ArrayList<>()
|
||||
List<Future> jobFutures = new ArrayList<>()
|
||||
|
||||
new Runnable() {
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
try {
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
// Our current instrumentation instrumentation does not behave very well
|
||||
// if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
|
||||
// child traces sometimes since state can contain only one continuation - and
|
||||
// we do not really have a good way for attributing work to correct parent span
|
||||
// if we reuse Callable/Runnable.
|
||||
// Solution for now is to never reuse a Callable/Runnable.
|
||||
final AkkaAsyncChild child = new AkkaAsyncChild(true, true)
|
||||
children.add(child)
|
||||
try {
|
||||
Future f = m.invoke(pool, new AkkaAsyncChild())
|
||||
jobFutures.add(f)
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause()
|
||||
}
|
||||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
}
|
||||
|
||||
for (Future f : jobFutures) {
|
||||
f.cancel(false)
|
||||
}
|
||||
for (AkkaAsyncChild child : children) {
|
||||
child.unblock()
|
||||
}
|
||||
}
|
||||
}.run()
|
||||
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
|
||||
expect:
|
||||
// FIXME: we should improve this test to make sure continuations are actually closed
|
||||
TEST_WRITER.size() == 1
|
||||
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
import akka.dispatch.forkjoin.ForkJoinTask;
|
||||
import datadog.trace.api.Trace;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class AkkaAsyncChild extends ForkJoinTask implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
|
||||
public AkkaAsyncChild() {
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRawResult() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setRawResult(final Object value) {}
|
||||
|
||||
@Override
|
||||
protected boolean exec() {
|
||||
runImpl();
|
||||
return true;
|
||||
}
|
||||
|
||||
public AkkaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
}
|
||||
|
||||
public void unblock() {
|
||||
blockThread.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
runImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
runImpl();
|
||||
return null;
|
||||
}
|
||||
|
||||
private void runImpl() {
|
||||
while (blockThread.get()) {
|
||||
// busy-wait to block thread
|
||||
}
|
||||
if (doTraceableWork) {
|
||||
asyncChild();
|
||||
}
|
||||
}
|
||||
|
||||
@Trace(operationName = "asyncChild")
|
||||
private void asyncChild() {}
|
||||
}
|
|
@ -1,3 +1,9 @@
|
|||
// Set properties before any plugins get loaded
|
||||
ext {
|
||||
// Execute tests on all JVMs, even rare and outdated ones
|
||||
coreJavaInstrumentation = true
|
||||
}
|
||||
|
||||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
apply from: "${rootDir}/gradle/test-with-scala.gradle"
|
||||
|
||||
|
|
|
@ -20,6 +20,11 @@ compileSlickTestGroovy {
|
|||
}
|
||||
|
||||
dependencies {
|
||||
// This is needed for Scala ForJoinTask/Pool instrumentation
|
||||
compileOnly deps.scala
|
||||
// This is needed for Akka ForJoinTask/Pool instrumentation
|
||||
compileOnly group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.0'
|
||||
|
||||
compile project(':dd-trace-api')
|
||||
compile project(':dd-java-agent:agent-tooling')
|
||||
|
||||
|
|
|
@ -1,3 +1,9 @@
|
|||
// Set properties before any plugins get loaded
|
||||
project.ext {
|
||||
// Execute tests on all JVMs, even rare and outdated ones
|
||||
coreJavaInstrumentation = true
|
||||
}
|
||||
|
||||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
apply from: "${rootDir}/gradle/test-with-scala.gradle"
|
||||
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
import datadog.opentracing.DDSpan
|
||||
import datadog.opentracing.scopemanager.ContinuableScope
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.Trace
|
||||
import io.opentracing.util.GlobalTracer
|
||||
import scala.concurrent.forkjoin.ForkJoinPool
|
||||
import scala.concurrent.forkjoin.ForkJoinTask
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Test executor instrumentation for Scala specific classes.
|
||||
* This is to large extent a copy of ExecutorInstrumentationTest.
|
||||
*/
|
||||
class ScalaExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method executeRunnableMethod
|
||||
@Shared
|
||||
Method scalaExecuteForkJoinTaskMethod
|
||||
@Shared
|
||||
Method submitRunnableMethod
|
||||
@Shared
|
||||
Method submitCallableMethod
|
||||
@Shared
|
||||
Method scalaSubmitForkJoinTaskMethod
|
||||
@Shared
|
||||
Method scalaInvokeForkJoinTaskMethod
|
||||
|
||||
def setupSpec() {
|
||||
executeRunnableMethod = Executor.getMethod("execute", Runnable)
|
||||
scalaExecuteForkJoinTaskMethod = ForkJoinPool.getMethod("execute", ForkJoinTask)
|
||||
submitRunnableMethod = ExecutorService.getMethod("submit", Runnable)
|
||||
submitCallableMethod = ExecutorService.getMethod("submit", Callable)
|
||||
scalaSubmitForkJoinTaskMethod = ForkJoinPool.getMethod("submit", ForkJoinTask)
|
||||
scalaInvokeForkJoinTaskMethod = ForkJoinPool.getMethod("invoke", ForkJoinTask)
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl #method propagates"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
||||
new Runnable() {
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new ScalaAsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new ScalaAsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == 2
|
||||
trace.get(0).operationName == "parent"
|
||||
trace.get(1).operationName == "asyncChild"
|
||||
trace.get(1).parentId == trace.get(0).spanId
|
||||
|
||||
cleanup:
|
||||
pool?.shutdown()
|
||||
|
||||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | executeRunnableMethod
|
||||
new ForkJoinPool() | scalaExecuteForkJoinTaskMethod
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | scalaSubmitForkJoinTaskMethod
|
||||
new ForkJoinPool() | scalaInvokeForkJoinTaskMethod
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl reports after canceled jobs"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
List<ScalaAsyncChild> children = new ArrayList<>()
|
||||
List<Future> jobFutures = new ArrayList<>()
|
||||
|
||||
new Runnable() {
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
try {
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
// Our current instrumentation instrumentation does not behave very well
|
||||
// if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
|
||||
// child traces sometimes since state can contain only one continuation - and
|
||||
// we do not really have a good way for attributing work to correct parent span
|
||||
// if we reuse Callable/Runnable.
|
||||
// Solution for now is to never reuse a Callable/Runnable.
|
||||
final ScalaAsyncChild child = new ScalaAsyncChild(true, true)
|
||||
children.add(child)
|
||||
try {
|
||||
Future f = m.invoke(pool, new ScalaAsyncChild())
|
||||
jobFutures.add(f)
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause()
|
||||
}
|
||||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
}
|
||||
|
||||
for (Future f : jobFutures) {
|
||||
f.cancel(false)
|
||||
}
|
||||
for (ScalaAsyncChild child : children) {
|
||||
child.unblock()
|
||||
}
|
||||
}
|
||||
}.run()
|
||||
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
|
||||
expect:
|
||||
// FIXME: we should improve this test to make sure continuations are actually closed
|
||||
TEST_WRITER.size() == 1
|
||||
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
import datadog.trace.api.Trace;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import scala.concurrent.forkjoin.ForkJoinTask;
|
||||
|
||||
public class ScalaAsyncChild extends ForkJoinTask implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
|
||||
public ScalaAsyncChild() {
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRawResult() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setRawResult(final Object value) {}
|
||||
|
||||
@Override
|
||||
protected boolean exec() {
|
||||
runImpl();
|
||||
return true;
|
||||
}
|
||||
|
||||
public ScalaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
}
|
||||
|
||||
public void unblock() {
|
||||
blockThread.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
runImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
runImpl();
|
||||
return null;
|
||||
}
|
||||
|
||||
private void runImpl() {
|
||||
while (blockThread.get()) {
|
||||
// busy-wait to block thread
|
||||
}
|
||||
if (doTraceableWork) {
|
||||
asyncChild();
|
||||
}
|
||||
}
|
||||
|
||||
@Trace(operationName = "asyncChild")
|
||||
private void asyncChild() {}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.Executor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractExecutorInstrumentation extends Instrumenter.Default {
|
||||
|
||||
public static final String EXEC_NAME = "java_concurrent";
|
||||
|
||||
/**
|
||||
* Only apply executor instrumentation to whitelisted executors. In the future, this restriction
|
||||
* may be lifted to include all executors.
|
||||
*/
|
||||
private static final Collection<String> WHITELISTED_EXECUTORS;
|
||||
/**
|
||||
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
|
||||
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
|
||||
* checking this list is more expensive (O(n)) we should try to keep it short.
|
||||
*/
|
||||
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
|
||||
|
||||
static {
|
||||
final String[] whitelist = {
|
||||
"java.util.concurrent.AbstractExecutorService",
|
||||
"java.util.concurrent.ThreadPoolExecutor",
|
||||
"java.util.concurrent.ScheduledThreadPoolExecutor",
|
||||
"java.util.concurrent.ForkJoinPool",
|
||||
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
|
||||
"java.util.concurrent.Executors$DelegatedExecutorService",
|
||||
"javax.management.NotificationBroadcasterSupport$1",
|
||||
"kotlinx.coroutines.scheduling.CoroutineScheduler",
|
||||
"scala.concurrent.Future$InternalCallbackExecutor$",
|
||||
"scala.concurrent.impl.ExecutionContextImpl",
|
||||
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
|
||||
"scala.concurrent.forkjoin.ForkJoinPool",
|
||||
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
|
||||
"akka.dispatch.MessageDispatcher",
|
||||
"akka.dispatch.Dispatcher",
|
||||
"akka.dispatch.Dispatcher$LazyExecutorServiceDelegate",
|
||||
"akka.actor.ActorSystemImpl$$anon$1",
|
||||
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
|
||||
"akka.dispatch.forkjoin.ForkJoinPool",
|
||||
"akka.dispatch.BalancingDispatcher",
|
||||
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
|
||||
"akka.dispatch.PinnedDispatcher",
|
||||
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
|
||||
"play.api.libs.streams.Execution$trampoline$",
|
||||
"io.netty.channel.MultithreadEventLoopGroup",
|
||||
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
|
||||
"io.netty.util.concurrent.AbstractEventExecutorGroup",
|
||||
"io.netty.channel.epoll.EpollEventLoopGroup",
|
||||
"io.netty.channel.nio.NioEventLoopGroup",
|
||||
"io.netty.util.concurrent.GlobalEventExecutor",
|
||||
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
|
||||
"io.netty.util.concurrent.AbstractEventExecutor",
|
||||
"io.netty.util.concurrent.SingleThreadEventExecutor",
|
||||
"io.netty.channel.nio.NioEventLoop",
|
||||
"io.netty.channel.SingleThreadEventLoop",
|
||||
};
|
||||
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
|
||||
|
||||
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
|
||||
WHITELISTED_EXECUTORS_PREFIXES =
|
||||
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
|
||||
}
|
||||
|
||||
public AbstractExecutorInstrumentation() {
|
||||
super(EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return not(isInterface())
|
||||
.and(safeHasSuperType(named(Executor.class.getName())))
|
||||
.and(
|
||||
new ElementMatcher<TypeDescription>() {
|
||||
@Override
|
||||
public boolean matches(final TypeDescription target) {
|
||||
boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
|
||||
|
||||
// Check for possible prefixes match only if not whitelisted already
|
||||
if (!whitelisted) {
|
||||
for (final String name : WHITELISTED_EXECUTORS_PREFIXES) {
|
||||
if (target.getName().startsWith(name)) {
|
||||
whitelisted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!whitelisted) {
|
||||
log.debug("Skipping executor instrumentation for {}", target.getName());
|
||||
}
|
||||
return whitelisted;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
AbstractExecutorInstrumentation.class.getPackage().getName() + ".ExecutorInstrumentationUtils"
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import akka.dispatch.forkjoin.ForkJoinTask;
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.InstrumentationContext;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class AkkaExecutorInstrumentation extends AbstractExecutorInstrumentation {
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
return Collections.singletonMap(
|
||||
AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("execute")
|
||||
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
|
||||
SetAkkaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit")
|
||||
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
|
||||
SetAkkaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke")
|
||||
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
|
||||
SetAkkaForkJoinStateAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
public static class SetAkkaForkJoinStateAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exitJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Enter final State state,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import akka.dispatch.forkjoin.ForkJoinPool;
|
||||
import akka.dispatch.forkjoin.ForkJoinTask;
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.InstrumentationContext;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
/**
|
||||
* Instrument {@link ForkJoinTask}.
|
||||
*
|
||||
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
|
||||
* ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles Akka version.
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class AkkaForkJoinTaskInstrumentation extends Instrumenter.Default {
|
||||
|
||||
static final String TASK_CLASS_NAME = "akka.dispatch.forkjoin.ForkJoinTask";
|
||||
|
||||
public AkkaForkJoinTaskInstrumentation() {
|
||||
super(AbstractExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return not(isInterface()).and(safeHasSuperType(named(TASK_CLASS_NAME)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
AdviceUtils.class.getName(),
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
map.put(Runnable.class.getName(), State.class.getName());
|
||||
map.put(Callable.class.getName(), State.class.getName());
|
||||
map.put(TASK_CLASS_NAME, State.class.getName());
|
||||
return Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("exec").and(takesArguments(0)).and(not(isAbstract())),
|
||||
ForkJoinTaskAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
public static class ForkJoinTaskAdvice {
|
||||
|
||||
/**
|
||||
* When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or
|
||||
* {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask}
|
||||
* directly. This means state is still stored in {@code Runnable} or {@code Callable} and we
|
||||
* need to use that state.
|
||||
*/
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static TraceScope enter(@Advice.This final ForkJoinTask thiz) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz);
|
||||
if (thiz instanceof Runnable) {
|
||||
final ContextStore<Runnable, State> runnableContextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
final TraceScope newScope =
|
||||
AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz);
|
||||
if (null != newScope) {
|
||||
if (null != scope) {
|
||||
newScope.close();
|
||||
} else {
|
||||
scope = newScope;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (thiz instanceof Callable) {
|
||||
final ContextStore<Callable, State> callableContextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
final TraceScope newScope =
|
||||
AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz);
|
||||
if (null != newScope) {
|
||||
if (null != scope) {
|
||||
newScope.close();
|
||||
} else {
|
||||
scope = newScope;
|
||||
}
|
||||
}
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter final TraceScope scope) {
|
||||
AdviceUtils.endTaskScope(scope);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.WeakMap;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.util.concurrent.Executor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/** Utils for concurrent instrumentations. */
|
||||
@Slf4j
|
||||
public class ExecutorInstrumentationUtils {
|
||||
|
||||
private static final WeakMap<Executor, Boolean> DISABLED_EXECUTORS =
|
||||
WeakMap.Provider.newWeakMap();
|
||||
|
||||
/**
|
||||
* Checks if given task should get state attached.
|
||||
*
|
||||
* @param task task object
|
||||
* @param executor executor this task was scheduled on
|
||||
* @return true iff given task object should be wrapped
|
||||
*/
|
||||
public static boolean shouldAttachStateToTask(final Object task, final Executor executor) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
return (scope instanceof TraceScope
|
||||
&& ((TraceScope) scope).isAsyncPropagating()
|
||||
&& task != null
|
||||
&& !ExecutorInstrumentationUtils.isDisabled(executor));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create task state given current scope.
|
||||
*
|
||||
* @param contextStore context storage
|
||||
* @param task task instance
|
||||
* @param scope current scope
|
||||
* @param <T> task class type
|
||||
* @return new state
|
||||
*/
|
||||
public static <T> State setupState(
|
||||
final ContextStore<T, State> contextStore, final T task, final TraceScope scope) {
|
||||
final State state = contextStore.putIfAbsent(task, State.FACTORY);
|
||||
final TraceScope.Continuation continuation = scope.capture();
|
||||
if (state.setContinuation(continuation)) {
|
||||
log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state);
|
||||
} else {
|
||||
continuation.close(false);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up after job submission method has exited.
|
||||
*
|
||||
* @param executor the current executor
|
||||
* @param state task instrumentation state
|
||||
* @param throwable throwable that may have been thrown
|
||||
*/
|
||||
public static void cleanUpOnMethodExit(
|
||||
final Executor executor, final State state, final Throwable throwable) {
|
||||
if (null != state && null != throwable) {
|
||||
/*
|
||||
Note: this may potentially close somebody else's continuation if we didn't set it
|
||||
up in setupState because it was already present before us. This should be safe but
|
||||
may lead to non-attributed async work in some very rare cases.
|
||||
Alternative is to not close continuation here if we did not set it up in setupState
|
||||
but this may potentially lead to memory leaks if callers do not properly handle
|
||||
exceptions.
|
||||
*/
|
||||
state.closeContinuation();
|
||||
}
|
||||
}
|
||||
|
||||
public static void disableExecutor(final Executor executor) {
|
||||
log.debug("Disabling Executor tracing for instance {}", executor);
|
||||
DISABLED_EXECUTORS.put(executor, true);
|
||||
}
|
||||
|
||||
public static boolean isDisabled(final Executor executor) {
|
||||
return DISABLED_EXECUTORS.containsKey(executor);
|
||||
}
|
||||
}
|
|
@ -67,7 +67,7 @@ public final class FutureInstrumentation extends Instrumenter.Default {
|
|||
}
|
||||
|
||||
public FutureInstrumentation() {
|
||||
super(ExecutorInstrumentation.EXEC_NAME);
|
||||
super(AbstractExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,17 +1,13 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.InstrumentationContext;
|
||||
import datadog.trace.bootstrap.WeakMap;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.CallableWrapper;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.RunnableWrapper;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
|
@ -19,11 +15,9 @@ import datadog.trace.context.TraceScope;
|
|||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -32,109 +26,11 @@ import java.util.concurrent.Future;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class ExecutorInstrumentation extends Instrumenter.Default {
|
||||
public static final String EXEC_NAME = "java_concurrent";
|
||||
|
||||
/**
|
||||
* Only apply executor instrumentation to whitelisted executors. In the future, this restriction
|
||||
* may be lifted to include all executors.
|
||||
*/
|
||||
private static final Collection<String> WHITELISTED_EXECUTORS;
|
||||
/**
|
||||
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
|
||||
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
|
||||
* checking this list is more expensive (O(n)) we should try to keep it short.
|
||||
*/
|
||||
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
|
||||
|
||||
static {
|
||||
final String[] whitelist = {
|
||||
"java.util.concurrent.AbstractExecutorService",
|
||||
"java.util.concurrent.ThreadPoolExecutor",
|
||||
"java.util.concurrent.ScheduledThreadPoolExecutor",
|
||||
"java.util.concurrent.ForkJoinPool",
|
||||
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
|
||||
"java.util.concurrent.Executors$DelegatedExecutorService",
|
||||
"javax.management.NotificationBroadcasterSupport$1",
|
||||
"kotlinx.coroutines.scheduling.CoroutineScheduler",
|
||||
"scala.concurrent.Future$InternalCallbackExecutor$",
|
||||
"scala.concurrent.impl.ExecutionContextImpl",
|
||||
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
|
||||
"scala.concurrent.forkjoin.ForkJoinPool",
|
||||
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
|
||||
"akka.dispatch.MessageDispatcher",
|
||||
"akka.dispatch.Dispatcher",
|
||||
"akka.dispatch.Dispatcher$LazyExecutorServiceDelegate",
|
||||
"akka.actor.ActorSystemImpl$$anon$1",
|
||||
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
|
||||
"akka.dispatch.forkjoin.ForkJoinPool",
|
||||
"akka.dispatch.BalancingDispatcher",
|
||||
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
|
||||
"akka.dispatch.PinnedDispatcher",
|
||||
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
|
||||
"play.api.libs.streams.Execution$trampoline$",
|
||||
"io.netty.channel.MultithreadEventLoopGroup",
|
||||
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
|
||||
"io.netty.util.concurrent.AbstractEventExecutorGroup",
|
||||
"io.netty.channel.epoll.EpollEventLoopGroup",
|
||||
"io.netty.channel.nio.NioEventLoopGroup",
|
||||
"io.netty.util.concurrent.GlobalEventExecutor",
|
||||
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
|
||||
"io.netty.util.concurrent.AbstractEventExecutor",
|
||||
"io.netty.util.concurrent.SingleThreadEventExecutor",
|
||||
"io.netty.channel.nio.NioEventLoop",
|
||||
"io.netty.channel.SingleThreadEventLoop",
|
||||
};
|
||||
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
|
||||
|
||||
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
|
||||
WHITELISTED_EXECUTORS_PREFIXES =
|
||||
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
|
||||
}
|
||||
|
||||
public ExecutorInstrumentation() {
|
||||
super(EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return not(isInterface())
|
||||
.and(safeHasSuperType(named(Executor.class.getName())))
|
||||
.and(
|
||||
new ElementMatcher<TypeDescription>() {
|
||||
@Override
|
||||
public boolean matches(final TypeDescription target) {
|
||||
boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
|
||||
|
||||
// Check for possible prefixes match only if not whitelisted already
|
||||
if (!whitelisted) {
|
||||
for (final String name : WHITELISTED_EXECUTORS_PREFIXES) {
|
||||
if (target.getName().startsWith(name)) {
|
||||
whitelisted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!whitelisted) {
|
||||
log.debug("Skipping executor instrumentation for {}", target.getName());
|
||||
}
|
||||
return whitelisted;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
ExecutorInstrumentation.class.getName() + "$ConcurrentUtils",
|
||||
};
|
||||
}
|
||||
public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumentation {
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
|
@ -154,7 +50,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
SetExecuteRunnableStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("execute").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetExecuteForkJoinStateAdvice.class.getName());
|
||||
SetJavaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit").and(takesArgument(0, Runnable.class)),
|
||||
SetSubmitRunnableStateAdvice.class.getName());
|
||||
|
@ -163,13 +59,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
SetCallableStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetExecuteForkJoinStateAdvice.class.getName());
|
||||
SetJavaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)),
|
||||
SetCallableStateForCallableCollectionAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke").and(takesArgument(0, ForkJoinTask.class)),
|
||||
SetExecuteForkJoinStateAdvice.class.getName());
|
||||
SetJavaForkJoinStateAdvice.class.getName());
|
||||
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
|
||||
named("dispatch")
|
||||
.and(takesArgument(0, Runnable.class))
|
||||
|
@ -185,11 +81,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
task = RunnableWrapper.wrapIfNeeded(task);
|
||||
final ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -199,21 +95,21 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Enter final State state,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
public static class SetExecuteForkJoinStateAdvice {
|
||||
public static class SetJavaForkJoinStateAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -223,7 +119,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Enter final State state,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,11 +130,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
task = RunnableWrapper.wrapIfNeeded(task);
|
||||
final ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -254,7 +150,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
InstrumentationContext.get(Future.class, State.class);
|
||||
contextStore.put(future, state);
|
||||
}
|
||||
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -265,11 +161,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) Callable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ConcurrentUtils.shouldAttachStateToTask(task, executor)) {
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
task = CallableWrapper.wrapIfNeeded(task);
|
||||
final ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
return ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -285,7 +181,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
InstrumentationContext.get(Future.class, State.class);
|
||||
contextStore.put(future, state);
|
||||
}
|
||||
ConcurrentUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +202,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
wrappedTasks.add(task);
|
||||
final ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
ConcurrentUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
}
|
||||
tasks = wrappedTasks;
|
||||
|
@ -351,79 +247,4 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Utils for concurrent instrumentations. */
|
||||
@Slf4j
|
||||
public static class ConcurrentUtils {
|
||||
|
||||
private static final WeakMap<Executor, Boolean> DISABLED_EXECUTORS =
|
||||
WeakMap.Provider.newWeakMap();
|
||||
|
||||
/**
|
||||
* Checks if given task should get state attached.
|
||||
*
|
||||
* @param task task object
|
||||
* @param executor executor this task was scheduled on
|
||||
* @return true iff given task object should be wrapped
|
||||
*/
|
||||
public static boolean shouldAttachStateToTask(final Object task, final Executor executor) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
return (scope instanceof TraceScope
|
||||
&& ((TraceScope) scope).isAsyncPropagating()
|
||||
&& task != null
|
||||
&& !ConcurrentUtils.isDisabled(executor));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create task state given current scope.
|
||||
*
|
||||
* @param contextStore context storage
|
||||
* @param task task instance
|
||||
* @param scope current scope
|
||||
* @param <T> task class type
|
||||
* @return new state
|
||||
*/
|
||||
public static <T> State setupState(
|
||||
final ContextStore<T, State> contextStore, final T task, final TraceScope scope) {
|
||||
final State state = contextStore.putIfAbsent(task, State.FACTORY);
|
||||
final TraceScope.Continuation continuation = scope.capture();
|
||||
if (state.setContinuation(continuation)) {
|
||||
log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state);
|
||||
} else {
|
||||
continuation.close(false);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up after job submission method has exited.
|
||||
*
|
||||
* @param executor the current executor
|
||||
* @param state task instrumentation state
|
||||
* @param throwable throwable that may have been thrown
|
||||
*/
|
||||
public static void cleanUpOnMethodExit(
|
||||
final Executor executor, final State state, final Throwable throwable) {
|
||||
if (null != state && null != throwable) {
|
||||
/*
|
||||
Note: this may potentially close somebody else's continuation if we didn't set it
|
||||
up in setupState because it was already present before us. This should be safe but
|
||||
may lead to non-attributed async work in some very rare cases.
|
||||
Alternative is to not close continuation here if we did not set it up in setupState
|
||||
but this may potentially lead to memory leaks if callers do not properly handle
|
||||
exceptions.
|
||||
*/
|
||||
state.closeContinuation();
|
||||
}
|
||||
}
|
||||
|
||||
public static void disableExecutor(final Executor executor) {
|
||||
log.debug("Disabling Executor tracing for instance {}", executor);
|
||||
DISABLED_EXECUTORS.put(executor, true);
|
||||
}
|
||||
|
||||
public static boolean isDisabled(final Executor executor) {
|
||||
return DISABLED_EXECUTORS.containsKey(executor);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,16 +29,14 @@ import net.bytebuddy.matcher.ElementMatcher;
|
|||
* Instrument {@link ForkJoinTask}.
|
||||
*
|
||||
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
|
||||
* ForkJoinPool}: JVM, akka, scala, netty to name a few. For now we only deal with JVM one because
|
||||
* there are known cases when JVM {@code ForkJoinTask} is supplied as {@code Runnable} into {@code
|
||||
* ForkJoinPool}.
|
||||
* ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles JVM version.
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class ForkJoinTaskInstrumentation extends Instrumenter.Default {
|
||||
public final class JavaForkJoinTaskInstrumentation extends Instrumenter.Default {
|
||||
|
||||
public ForkJoinTaskInstrumentation() {
|
||||
super(ExecutorInstrumentation.EXEC_NAME);
|
||||
public JavaForkJoinTaskInstrumentation() {
|
||||
super(AbstractExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
|
@ -29,7 +29,7 @@ import net.bytebuddy.matcher.ElementMatcher;
|
|||
public final class RunnableCallableInstrumentation extends Instrumenter.Default {
|
||||
|
||||
public RunnableCallableInstrumentation() {
|
||||
super(ExecutorInstrumentation.EXEC_NAME);
|
||||
super(AbstractExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.InstrumentationContext;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import scala.concurrent.forkjoin.ForkJoinTask;
|
||||
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrumentation {
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
return Collections.singletonMap(
|
||||
ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("execute")
|
||||
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
|
||||
SetScalaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("submit")
|
||||
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
|
||||
SetScalaForkJoinStateAdvice.class.getName());
|
||||
transformers.put(
|
||||
nameMatches("invoke")
|
||||
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
|
||||
SetScalaForkJoinStateAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
public static class SetScalaForkJoinStateAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Argument(value = 0, readOnly = false) final ForkJoinTask task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task, executor)) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(contextStore, task, (TraceScope) scope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exitJobSubmit(
|
||||
@Advice.This final Executor executor,
|
||||
@Advice.Enter final State state,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(executor, state, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
package datadog.trace.instrumentation.java.concurrent;
|
||||
|
||||
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.ContextStore;
|
||||
import datadog.trace.bootstrap.InstrumentationContext;
|
||||
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import scala.concurrent.forkjoin.ForkJoinPool;
|
||||
import scala.concurrent.forkjoin.ForkJoinTask;
|
||||
|
||||
/**
|
||||
* Instrument {@link ForkJoinTask}.
|
||||
*
|
||||
* <p>Note: There are quite a few separate implementations of {@code ForkJoinTask}/{@code
|
||||
* ForkJoinPool}: JVM, Akka, Scala, Netty to name a few. This class handles Scala version.
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class ScalaForkJoinTaskInstrumentation extends Instrumenter.Default {
|
||||
|
||||
static final String TASK_CLASS_NAME = "scala.concurrent.forkjoin.ForkJoinTask";
|
||||
|
||||
public ScalaForkJoinTaskInstrumentation() {
|
||||
super(AbstractExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return not(isInterface()).and(safeHasSuperType(named(TASK_CLASS_NAME)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
AdviceUtils.class.getName(),
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> contextStore() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
map.put(Runnable.class.getName(), State.class.getName());
|
||||
map.put(Callable.class.getName(), State.class.getName());
|
||||
map.put(TASK_CLASS_NAME, State.class.getName());
|
||||
return Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("exec").and(takesArguments(0)).and(not(isAbstract())),
|
||||
ForkJoinTaskAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
public static class ForkJoinTaskAdvice {
|
||||
|
||||
/**
|
||||
* When {@link ForkJoinTask} object is submitted to {@link ForkJoinPool} as {@link Runnable} or
|
||||
* {@link Callable} it will not get wrapped, instead it will be casted to {@code ForkJoinTask}
|
||||
* directly. This means state is still stored in {@code Runnable} or {@code Callable} and we
|
||||
* need to use that state.
|
||||
*/
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static TraceScope enter(@Advice.This final ForkJoinTask thiz) {
|
||||
final ContextStore<ForkJoinTask, State> contextStore =
|
||||
InstrumentationContext.get(ForkJoinTask.class, State.class);
|
||||
TraceScope scope = AdviceUtils.startTaskScope(contextStore, thiz);
|
||||
if (thiz instanceof Runnable) {
|
||||
final ContextStore<Runnable, State> runnableContextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
final TraceScope newScope =
|
||||
AdviceUtils.startTaskScope(runnableContextStore, (Runnable) thiz);
|
||||
if (null != newScope) {
|
||||
if (null != scope) {
|
||||
newScope.close();
|
||||
} else {
|
||||
scope = newScope;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (thiz instanceof Callable) {
|
||||
final ContextStore<Callable, State> callableContextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
final TraceScope newScope =
|
||||
AdviceUtils.startTaskScope(callableContextStore, (Callable) thiz);
|
||||
if (null != newScope) {
|
||||
if (null != scope) {
|
||||
newScope.close();
|
||||
} else {
|
||||
scope = newScope;
|
||||
}
|
||||
}
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter final TraceScope scope) {
|
||||
AdviceUtils.endTaskScope(scope);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,7 +27,7 @@ import net.bytebuddy.matcher.ElementMatcher;
|
|||
public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
|
||||
|
||||
public ThreadPoolExecutorInstrumentation() {
|
||||
super(ExecutorInstrumentation.EXEC_NAME);
|
||||
super(AbstractExecutorInstrumentation.EXEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -38,7 +38,8 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
ExecutorInstrumentation.class.getName() + "$ConcurrentUtils",
|
||||
ThreadPoolExecutorInstrumentation.class.getPackage().getName()
|
||||
+ ".ExecutorInstrumentationUtils",
|
||||
ThreadPoolExecutorInstrumentation.class.getName() + "$GenericRunnable",
|
||||
};
|
||||
}
|
||||
|
@ -56,7 +57,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
|
|||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void disableIfQueueWrongType(
|
||||
@Advice.This final ThreadPoolExecutor executor,
|
||||
@Advice.Argument(4) final BlockingQueue queue) {
|
||||
@Advice.Argument(4) final BlockingQueue<Runnable> queue) {
|
||||
|
||||
if (queue.size() == 0) {
|
||||
try {
|
||||
|
@ -65,7 +66,7 @@ public class ThreadPoolExecutorInstrumentation extends Instrumenter.Default {
|
|||
} catch (final ClassCastException | IllegalArgumentException e) {
|
||||
// These errors indicate the queue is fundamentally incompatible with wrapped runnables.
|
||||
// We must disable the executor instance to avoid passing wrapped runnables later.
|
||||
ExecutorInstrumentation.ConcurrentUtils.disableExecutor(executor);
|
||||
ExecutorInstrumentationUtils.disableExecutor(executor);
|
||||
} catch (final Exception e) {
|
||||
// Other errors might indicate the queue is not fully initialized.
|
||||
// We might want to disable for those too, but for now just ignore.
|
||||
|
|
|
@ -54,9 +54,9 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new AsyncChild())
|
||||
m.invoke(pool, new JavaAsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new AsyncChild(false, false))
|
||||
m.invoke(pool, new JavaAsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
|
@ -82,9 +82,11 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
new ForkJoinPool() | submitCallableMethod
|
||||
new ForkJoinPool() | submitForkJoinTaskMethod
|
||||
new ForkJoinPool() | invokeForkJoinTaskMethod
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
|
||||
}
|
||||
|
||||
// more useful name breaks java9 javac
|
||||
|
@ -93,7 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
List<AsyncChild> children = new ArrayList<>()
|
||||
List<JavaAsyncChild> children = new ArrayList<>()
|
||||
List<Future> jobFutures = new ArrayList<>()
|
||||
|
||||
new Runnable() {
|
||||
|
@ -109,10 +111,10 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
// we do not really have a good way for attributing work to correct parent span
|
||||
// if we reuse Callable/Runnable.
|
||||
// Solution for now is to never reuse a Callable/Runnable.
|
||||
final AsyncChild child = new AsyncChild(true, true)
|
||||
final JavaAsyncChild child = new JavaAsyncChild(true, true)
|
||||
children.add(child)
|
||||
try {
|
||||
Future f = m.invoke(pool, new AsyncChild())
|
||||
Future f = m.invoke(pool, new JavaAsyncChild())
|
||||
jobFutures.add(f)
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause()
|
||||
|
@ -124,7 +126,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
for (Future f : jobFutures) {
|
||||
f.cancel(false)
|
||||
}
|
||||
for (AsyncChild child : children) {
|
||||
for (JavaAsyncChild child : children) {
|
||||
child.unblock()
|
||||
}
|
||||
}
|
||||
|
@ -140,6 +142,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
poolImpl | method
|
||||
new ForkJoinPool() | submitRunnableMethod
|
||||
new ForkJoinPool() | submitCallableMethod
|
||||
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitRunnableMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitCallableMethod
|
||||
}
|
||||
|
|
|
@ -3,11 +3,11 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class AsyncChild extends ForkJoinTask implements Runnable, Callable {
|
||||
public class JavaAsyncChild extends ForkJoinTask implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
|
||||
public AsyncChild() {
|
||||
public JavaAsyncChild() {
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ public class AsyncChild extends ForkJoinTask implements Runnable, Callable {
|
|||
return true;
|
||||
}
|
||||
|
||||
public AsyncChild(final boolean doTraceableWork, final boolean blockThread) {
|
||||
public JavaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
}
|
|
@ -48,6 +48,7 @@ include ':dd-java-agent:instrumentation:java-concurrent'
|
|||
include ':dd-java-agent:instrumentation:java-concurrent:kotlin-testing'
|
||||
include ':dd-java-agent:instrumentation:java-concurrent:scala-testing'
|
||||
include ':dd-java-agent:instrumentation:java-concurrent:akka-testing'
|
||||
include ':dd-java-agent:instrumentation:java-concurrent:akka-2.5-testing'
|
||||
include ':dd-java-agent:instrumentation:jboss-classloading'
|
||||
include ':dd-java-agent:instrumentation:jdbc'
|
||||
include ':dd-java-agent:instrumentation:jedis-1.4'
|
||||
|
|
Loading…
Reference in New Issue