Move QueuedThreadPool from executors to jetty (#1963)

This commit is contained in:
jason plumb 2021-01-06 11:02:39 -08:00 committed by GitHub
parent 5f816c5d43
commit f14c9b7723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 231 additions and 56 deletions

View File

@ -88,7 +88,8 @@ public abstract class AbstractExecutorInstrumentation implements TypeInstrumenta
"java.util.concurrent.ForkJoinPool",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"java.util.concurrent.ThreadPoolExecutor",
"org.eclipse.jetty.util.thread.QueuedThreadPool",
"org.eclipse.jetty.util.thread.QueuedThreadPool", // dispatch() is covered in the jetty
// module
"org.eclipse.jetty.util.thread.ReservedThreadExecutor",
"org.glassfish.grizzly.threadpool.GrizzlyExecutorService",
"play.api.libs.streams.Execution$trampoline$",

View File

@ -1,53 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.javaconcurrent;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.instrumentation.api.concurrent.State;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(InstrumentationModule.class)
public class NonStandardExecutorInstrumentationModule extends InstrumentationModule {
public NonStandardExecutorInstrumentationModule() {
super("executor", "non-standard-executor");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new OtherExecutorsInstrumentation());
}
@Override
public Map<String, String> contextStore() {
return singletonMap(Runnable.class.getName(), State.class.getName());
}
public static class OtherExecutorsInstrumentation extends AbstractExecutorInstrumentation {
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
// org.eclipse.jetty.util.thread.QueuedThreadPool
named("dispatch").and(takesArguments(1)).and(takesArgument(0, Runnable.class)),
JavaExecutorInstrumentation.class.getName() + "$SetExecuteRunnableStateAdvice");
return transformers;
}
}
}

View File

@ -7,18 +7,27 @@ package io.opentelemetry.javaagent.instrumentation.jetty;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.api.concurrent.ExecutorInstrumentationUtils;
import io.opentelemetry.javaagent.instrumentation.api.concurrent.RunnableWrapper;
import io.opentelemetry.javaagent.instrumentation.api.concurrent.State;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@ -32,7 +41,12 @@ public class JettyInstrumentationModule extends InstrumentationModule {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HandlerInstrumentation());
return asList(new HandlerInstrumentation(), new JettyQueuedThreadPoolInstrumentation());
}
@Override
public Map<String, String> contextStore() {
return singletonMap(Runnable.class.getName(), State.class.getName());
}
public static class HandlerInstrumentation implements TypeInstrumentation {
@ -70,4 +84,45 @@ public class JettyInstrumentationModule extends InstrumentationModule {
JettyHandlerAdvice.class.getName());
}
}
public static class JettyQueuedThreadPoolInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.eclipse.jetty.util.thread.QueuedThreadPool");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
named("dispatch").and(takesArguments(1)).and(takesArgument(0, Runnable.class)),
JettyInstrumentationModule.class.getName() + "$SetExecuteRunnableStateAdvice");
return transformers;
}
}
public static class SetExecuteRunnableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
task = newTask;
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
return ExecutorInstrumentationUtils.setupState(
contextStore, newTask, Java8BytecodeBridge.currentContext());
}
return null;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitJobSubmit(
@Advice.Enter State state, @Advice.Thrown Throwable throwable) {
ExecutorInstrumentationUtils.cleanUpOnMethodExit(state, throwable);
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class JavaAsyncChild extends ForkJoinTask implements Runnable, Callable {
private static final Tracer tracer = OpenTelemetry.getGlobalTracer("io.opentelemetry.auto");
private final AtomicBoolean blockThread;
private final boolean doTraceableWork;
private final CountDownLatch latch = new CountDownLatch(1);
public JavaAsyncChild() {
this(true, false);
}
@Override
public Object getRawResult() {
return null;
}
@Override
protected void setRawResult(Object value) {}
@Override
protected boolean exec() {
runImpl();
return true;
}
public JavaAsyncChild(boolean doTraceableWork, boolean blockThread) {
this.doTraceableWork = doTraceableWork;
this.blockThread = new AtomicBoolean(blockThread);
}
public void unblock() {
blockThread.set(false);
}
@Override
public void run() {
runImpl();
}
@Override
public Object call() {
runImpl();
return null;
}
public void waitForCompletion() throws InterruptedException {
latch.await();
}
private void runImpl() {
while (blockThread.get()) {
// busy-wait to block thread
}
if (doTraceableWork) {
asyncChild();
}
latch.countDown();
}
private void asyncChild() {
tracer.spanBuilder("asyncChild").startSpan().end();
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestRunner
import io.opentelemetry.javaagent.instrumentation.jetty.JavaLambdaMaker
import io.opentelemetry.sdk.trace.data.SpanData
import org.eclipse.jetty.util.thread.QueuedThreadPool
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
class QueuedThreadPoolTest extends AgentTestRunner {
def "QueueThreadPool 'dispatch' propagates"() {
setup:
def pool = new QueuedThreadPool()
pool.start()
new Runnable() {
@Override
void run() {
runUnderTrace("parent") {
// this child will have a span
def child1 = new JavaAsyncChild()
// this child won't
def child2 = new JavaAsyncChild(false, false)
pool.dispatch(child1)
pool.dispatch(child2)
child1.waitForCompletion()
child2.waitForCompletion()
}
}
}.run()
TEST_WRITER.waitForTraces(1)
List<SpanData> trace = TEST_WRITER.traces[0]
expect:
TEST_WRITER.traces.size() == 1
trace.size() == 2
trace.get(0).traceId == trace.get(1).traceId
trace.get(0).name == "parent"
trace.get(1).name == "asyncChild"
trace.get(1).parentSpanId == trace.get(0).spanId
cleanup:
pool.stop()
}
def "QueueThreadPool 'dispatch' propagates lambda"() {
setup:
def pool = new QueuedThreadPool()
pool.start()
JavaAsyncChild child = new JavaAsyncChild(true, true)
new Runnable() {
@Override
void run() {
runUnderTrace("parent") {
pool.dispatch(JavaLambdaMaker.lambda(child))
}
}
}.run()
// We block in child to make sure spans close in predictable order
child.unblock()
child.waitForCompletion()
TEST_WRITER.waitForTraces(1)
List<SpanData> trace = TEST_WRITER.traces[0]
expect:
TEST_WRITER.traces.size() == 1
trace.size() == 2
trace.get(0).traceId == trace.get(1).traceId
trace.get(0).name == "parent"
trace.get(1).name == "asyncChild"
trace.get(1).parentSpanId == trace.get(0).spanId
cleanup:
pool.stop()
}
}

View File

@ -0,0 +1,14 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.jetty;
public class JavaLambdaMaker {
@SuppressWarnings("FunctionalExpressionCanBeFolded")
public static Runnable lambda(Runnable runnable) {
return runnable::run;
}
}