Merge pull request #1005 from DataDog/tyler/additional-executors

Fix async dispatch for Jetty QueuedThreadPool
This commit is contained in:
Tyler Benson 2019-09-19 12:15:37 -07:00 committed by GitHub
commit 40a7ef6086
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 36 deletions

View File

@ -55,10 +55,10 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool",
"akka.dispatch.MessageDispatcher",
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory$$anon$1",
"com.google.common.util.concurrent.AbstractListeningExecutorService",
"com.google.common.util.concurrent.MoreExecutors$ListeningDecorator",
"com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator",
"io.netty.channel.epoll.EpollEventLoop",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.channel.nio.NioEventLoop",
@ -67,6 +67,8 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.DefaultEventExecutor",
"io.netty.util.concurrent.DefaultEventExecutorGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.SingleThreadEventExecutor",
@ -76,15 +78,14 @@ public abstract class AbstractExecutorInstrumentation extends Instrumenter.Defau
"java.util.concurrent.ForkJoinPool",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"java.util.concurrent.ThreadPoolExecutor",
"javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"org.eclipse.jetty.util.thread.QueuedThreadPool",
"org.eclipse.jetty.util.thread.ReservedThreadExecutor",
"org.glassfish.grizzly.threadpool.GrizzlyExecutorService",
"play.api.libs.streams.Execution$trampoline$",
"scala.concurrent.forkjoin.ForkJoinPool",
"scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
"scala.concurrent.impl.ExecutionContextImpl$$anon$3",
};
final Set<String> executors = new HashSet<>(Config.get().getTraceExecutors());

View File

@ -70,11 +70,6 @@ public final class JavaExecutorInstrumentation extends AbstractExecutorInstrumen
transformers.put(
named("schedule").and(takesArgument(0, Callable.class)),
SetCallableStateAdvice.class.getName());
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
named("dispatch")
.and(takesArgument(0, Runnable.class))
.and(takesArgument(1, named("kotlinx.coroutines.scheduling.TaskContext"))),
SetExecuteRunnableStateAdvice.class.getName());
return transformers;
}

View File

@ -0,0 +1,41 @@
package datadog.trace.instrumentation.java.concurrent;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class NonStandardExecutorInstrumentation extends AbstractExecutorInstrumentation {
public NonStandardExecutorInstrumentation() {
super(EXEC_NAME + ".other");
}
@Override
public Map<String, String> contextStore() {
return singletonMap(Runnable.class.getName(), State.class.getName());
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
named("dispatch")
.and(takesArgument(0, Runnable.class))
.and(takesArgument(1, named("kotlinx.coroutines.scheduling.TaskContext"))),
JavaExecutorInstrumentation.SetExecuteRunnableStateAdvice.class.getName());
transformers.put( // org.eclipse.jetty.util.thread.QueuedThreadPool
named("dispatch").and(takesArguments(1)).and(takesArgument(0, Runnable.class)),
JavaExecutorInstrumentation.SetExecuteRunnableStateAdvice.class.getName());
return transformers;
}
}

View File

@ -100,14 +100,13 @@ class JettyServlet3TestSync extends JettyServlet3Test {
}
}
// FIXME: Async context propagation for org.eclipse.jetty.util.thread.QueuedThreadPool.dispatch currently broken.
//class JettyServlet3TestAsync extends JettyServlet3Test {
//
// @Override
// Class<Servlet> servlet() {
// TestServlet3.Async
// }
//}
class JettyServlet3TestAsync extends JettyServlet3Test {
@Override
Class<Servlet> servlet() {
TestServlet3.Async
}
}
class JettyServlet3TestFakeAsync extends JettyServlet3Test {
@ -137,25 +136,24 @@ class JettyServlet3TestDispatchImmediate extends JettyDispatchTest {
}
}
// FIXME: Async context propagation for org.eclipse.jetty.util.thread.QueuedThreadPool.dispatch currently broken.
//class JettyServlet3TestDispatchAsync extends JettyDispatchTest {
// @Override
// Class<Servlet> servlet() {
// TestServlet3.Async
// }
//
// @Override
// protected void setupServlets(ServletContextHandler context) {
// super.setupServlets(context)
//
// addServlet(context, "/dispatch" + SUCCESS.path, TestServlet3.DispatchAsync)
// addServlet(context, "/dispatch" + ERROR.path, TestServlet3.DispatchAsync)
// addServlet(context, "/dispatch" + EXCEPTION.path, TestServlet3.DispatchAsync)
// addServlet(context, "/dispatch" + REDIRECT.path, TestServlet3.DispatchAsync)
// addServlet(context, "/dispatch" + AUTH_REQUIRED.path, TestServlet3.DispatchAsync)
// addServlet(context, "/dispatch/recursive", TestServlet3.DispatchRecursive)
// }
//}
class JettyServlet3TestDispatchAsync extends JettyDispatchTest {
@Override
Class<Servlet> servlet() {
TestServlet3.Async
}
@Override
protected void setupServlets(ServletContextHandler context) {
super.setupServlets(context)
addServlet(context, "/dispatch" + SUCCESS.path, TestServlet3.DispatchAsync)
addServlet(context, "/dispatch" + ERROR.path, TestServlet3.DispatchAsync)
addServlet(context, "/dispatch" + EXCEPTION.path, TestServlet3.DispatchAsync)
addServlet(context, "/dispatch" + REDIRECT.path, TestServlet3.DispatchAsync)
addServlet(context, "/dispatch" + AUTH_REQUIRED.path, TestServlet3.DispatchAsync)
addServlet(context, "/dispatch/recursive", TestServlet3.DispatchRecursive)
}
}
abstract class JettyDispatchTest extends JettyServlet3Test {
@Override