From f14c9b77237f61fa1ba2dcfaefee3e40f2aa7f91 Mon Sep 17 00:00:00 2001 From: jason plumb <75337021+breedx-splk@users.noreply.github.com> Date: Wed, 6 Jan 2021 11:02:39 -0800 Subject: [PATCH] Move QueuedThreadPool from executors to jetty (#1963) --- .../AbstractExecutorInstrumentation.java | 3 +- ...StandardExecutorInstrumentationModule.java | 53 ------------ .../jetty/JettyInstrumentationModule.java | 59 ++++++++++++- .../src/test/groovy/JavaAsyncChild.java | 75 +++++++++++++++++ .../test/groovy/QueuedThreadPoolTest.groovy | 83 +++++++++++++++++++ .../jetty/JavaLambdaMaker.java | 14 ++++ 6 files changed, 231 insertions(+), 56 deletions(-) delete mode 100644 instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/NonStandardExecutorInstrumentationModule.java create mode 100644 instrumentation/jetty-8.0/javaagent/src/test/groovy/JavaAsyncChild.java create mode 100644 instrumentation/jetty-8.0/javaagent/src/test/groovy/QueuedThreadPoolTest.groovy create mode 100644 instrumentation/jetty-8.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jetty/JavaLambdaMaker.java diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorInstrumentation.java index f60b34344a..83c5fb5f7d 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorInstrumentation.java @@ -88,7 +88,8 @@ public abstract class AbstractExecutorInstrumentation implements TypeInstrumenta "java.util.concurrent.ForkJoinPool", "java.util.concurrent.ScheduledThreadPoolExecutor", "java.util.concurrent.ThreadPoolExecutor", - "org.eclipse.jetty.util.thread.QueuedThreadPool", + "org.eclipse.jetty.util.thread.QueuedThreadPool", // dispatch() is covered in the jetty + // module "org.eclipse.jetty.util.thread.ReservedThreadExecutor", "org.glassfish.grizzly.threadpool.GrizzlyExecutorService", "play.api.libs.streams.Execution$trampoline$", diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/NonStandardExecutorInstrumentationModule.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/NonStandardExecutorInstrumentationModule.java deleted file mode 100644 index 8acd38500f..0000000000 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/NonStandardExecutorInstrumentationModule.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.javaconcurrent; - -import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - -import com.google.auto.service.AutoService; -import io.opentelemetry.javaagent.instrumentation.api.concurrent.State; -import io.opentelemetry.javaagent.tooling.InstrumentationModule; -import io.opentelemetry.javaagent.tooling.TypeInstrumentation; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.matcher.ElementMatcher; - -@AutoService(InstrumentationModule.class) -public class NonStandardExecutorInstrumentationModule extends InstrumentationModule { - - public NonStandardExecutorInstrumentationModule() { - super("executor", "non-standard-executor"); - } - - @Override - public List typeInstrumentations() { - return singletonList(new OtherExecutorsInstrumentation()); - } - - @Override - public Map contextStore() { - return singletonMap(Runnable.class.getName(), State.class.getName()); - } - - public static class OtherExecutorsInstrumentation extends AbstractExecutorInstrumentation { - @Override - public Map, String> transformers() { - Map, String> transformers = new HashMap<>(); - - transformers.put( - // org.eclipse.jetty.util.thread.QueuedThreadPool - named("dispatch").and(takesArguments(1)).and(takesArgument(0, Runnable.class)), - JavaExecutorInstrumentation.class.getName() + "$SetExecuteRunnableStateAdvice"); - return transformers; - } - } -} diff --git a/instrumentation/jetty-8.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jetty/JettyInstrumentationModule.java b/instrumentation/jetty-8.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jetty/JettyInstrumentationModule.java index 5142b5de86..11f734e574 100644 --- a/instrumentation/jetty-8.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jetty/JettyInstrumentationModule.java +++ b/instrumentation/jetty-8.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jetty/JettyInstrumentationModule.java @@ -7,18 +7,27 @@ package io.opentelemetry.javaagent.instrumentation.jetty; import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import io.opentelemetry.javaagent.instrumentation.api.concurrent.ExecutorInstrumentationUtils; +import io.opentelemetry.javaagent.instrumentation.api.concurrent.RunnableWrapper; +import io.opentelemetry.javaagent.instrumentation.api.concurrent.State; import io.opentelemetry.javaagent.tooling.InstrumentationModule; import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.HashMap; import java.util.List; import java.util.Map; +import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -32,7 +41,12 @@ public class JettyInstrumentationModule extends InstrumentationModule { @Override public List typeInstrumentations() { - return singletonList(new HandlerInstrumentation()); + return asList(new HandlerInstrumentation(), new JettyQueuedThreadPoolInstrumentation()); + } + + @Override + public Map contextStore() { + return singletonMap(Runnable.class.getName(), State.class.getName()); } public static class HandlerInstrumentation implements TypeInstrumentation { @@ -70,4 +84,45 @@ public class JettyInstrumentationModule extends InstrumentationModule { JettyHandlerAdvice.class.getName()); } } + + public static class JettyQueuedThreadPoolInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.eclipse.jetty.util.thread.QueuedThreadPool"); + } + + @Override + public Map, String> transformers() { + Map, String> transformers = new HashMap<>(); + + transformers.put( + named("dispatch").and(takesArguments(1)).and(takesArgument(0, Runnable.class)), + JettyInstrumentationModule.class.getName() + "$SetExecuteRunnableStateAdvice"); + return transformers; + } + } + + public static class SetExecuteRunnableStateAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static State enterJobSubmit( + @Advice.Argument(value = 0, readOnly = false) Runnable task) { + Runnable newTask = RunnableWrapper.wrapIfNeeded(task); + if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) { + task = newTask; + ContextStore contextStore = + InstrumentationContext.get(Runnable.class, State.class); + return ExecutorInstrumentationUtils.setupState( + contextStore, newTask, Java8BytecodeBridge.currentContext()); + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exitJobSubmit( + @Advice.Enter State state, @Advice.Thrown Throwable throwable) { + ExecutorInstrumentationUtils.cleanUpOnMethodExit(state, throwable); + } + } } diff --git a/instrumentation/jetty-8.0/javaagent/src/test/groovy/JavaAsyncChild.java b/instrumentation/jetty-8.0/javaagent/src/test/groovy/JavaAsyncChild.java new file mode 100644 index 0000000000..f6c68cb338 --- /dev/null +++ b/instrumentation/jetty-8.0/javaagent/src/test/groovy/JavaAsyncChild.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.atomic.AtomicBoolean; + +public class JavaAsyncChild extends ForkJoinTask implements Runnable, Callable { + private static final Tracer tracer = OpenTelemetry.getGlobalTracer("io.opentelemetry.auto"); + + private final AtomicBoolean blockThread; + private final boolean doTraceableWork; + private final CountDownLatch latch = new CountDownLatch(1); + + public JavaAsyncChild() { + this(true, false); + } + + @Override + public Object getRawResult() { + return null; + } + + @Override + protected void setRawResult(Object value) {} + + @Override + protected boolean exec() { + runImpl(); + return true; + } + + public JavaAsyncChild(boolean doTraceableWork, boolean blockThread) { + this.doTraceableWork = doTraceableWork; + this.blockThread = new AtomicBoolean(blockThread); + } + + public void unblock() { + blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() { + runImpl(); + return null; + } + + public void waitForCompletion() throws InterruptedException { + latch.await(); + } + + private void runImpl() { + while (blockThread.get()) { + // busy-wait to block thread + } + if (doTraceableWork) { + asyncChild(); + } + latch.countDown(); + } + + private void asyncChild() { + tracer.spanBuilder("asyncChild").startSpan().end(); + } +} diff --git a/instrumentation/jetty-8.0/javaagent/src/test/groovy/QueuedThreadPoolTest.groovy b/instrumentation/jetty-8.0/javaagent/src/test/groovy/QueuedThreadPoolTest.groovy new file mode 100644 index 0000000000..059aed6fcc --- /dev/null +++ b/instrumentation/jetty-8.0/javaagent/src/test/groovy/QueuedThreadPoolTest.groovy @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentTestRunner +import io.opentelemetry.javaagent.instrumentation.jetty.JavaLambdaMaker +import io.opentelemetry.sdk.trace.data.SpanData +import org.eclipse.jetty.util.thread.QueuedThreadPool + +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace + +class QueuedThreadPoolTest extends AgentTestRunner { + + def "QueueThreadPool 'dispatch' propagates"() { + setup: + def pool = new QueuedThreadPool() + pool.start() + + new Runnable() { + @Override + void run() { + runUnderTrace("parent") { + // this child will have a span + def child1 = new JavaAsyncChild() + // this child won't + def child2 = new JavaAsyncChild(false, false) + pool.dispatch(child1) + pool.dispatch(child2) + child1.waitForCompletion() + child2.waitForCompletion() + } + } + }.run() + + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.traces[0] + + expect: + TEST_WRITER.traces.size() == 1 + trace.size() == 2 + trace.get(0).traceId == trace.get(1).traceId + trace.get(0).name == "parent" + trace.get(1).name == "asyncChild" + trace.get(1).parentSpanId == trace.get(0).spanId + + cleanup: + pool.stop() + } + + def "QueueThreadPool 'dispatch' propagates lambda"() { + setup: + def pool = new QueuedThreadPool() + pool.start() + + JavaAsyncChild child = new JavaAsyncChild(true, true) + new Runnable() { + @Override + void run() { + runUnderTrace("parent") { + pool.dispatch(JavaLambdaMaker.lambda(child)) + } + } + }.run() + // We block in child to make sure spans close in predictable order + child.unblock() + child.waitForCompletion() + + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.traces[0] + + expect: + TEST_WRITER.traces.size() == 1 + trace.size() == 2 + trace.get(0).traceId == trace.get(1).traceId + trace.get(0).name == "parent" + trace.get(1).name == "asyncChild" + trace.get(1).parentSpanId == trace.get(0).spanId + + cleanup: + pool.stop() + } +} diff --git a/instrumentation/jetty-8.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jetty/JavaLambdaMaker.java b/instrumentation/jetty-8.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jetty/JavaLambdaMaker.java new file mode 100644 index 0000000000..6ef578e21b --- /dev/null +++ b/instrumentation/jetty-8.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jetty/JavaLambdaMaker.java @@ -0,0 +1,14 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jetty; + +public class JavaLambdaMaker { + + @SuppressWarnings("FunctionalExpressionCanBeFolded") + public static Runnable lambda(Runnable runnable) { + return runnable::run; + } +}