Propagate context in Spring SimpleAsyncTaskExecutor (#1897)
* Propagate context in Spring SimpleAsyncTaskExecutor * Propagate context in Spring SimpleAsyncTaskExecutor: code review comments
This commit is contained in:
parent
49c2a36811
commit
a7cad4b36f
|
@ -0,0 +1,17 @@
|
|||
apply from: "$rootDir/gradle/instrumentation.gradle"
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group = 'org.springframework'
|
||||
module = 'spring-core'
|
||||
versions = "[2.0,]"
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
library group: 'org.springframework', name: 'spring-core', version: '2.0'
|
||||
|
||||
// 3.0 introduces submit() methods
|
||||
// 4.0 introduces submitListenable() methods
|
||||
testLibrary group: 'org.springframework', name: 'spring-core', version: '4.0.0.RELEASE'
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.core;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.concurrent.ExecutorInstrumentationUtils;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.concurrent.RunnableWrapper;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.concurrent.State;
|
||||
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
public class SimpleAsyncTaskExecutorInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<? super TypeDescription> typeMatcher() {
|
||||
return named("org.springframework.core.task.SimpleAsyncTaskExecutor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
return singletonMap(
|
||||
isMethod()
|
||||
.and(isProtected())
|
||||
.and(named("doExecute"))
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, Runnable.class)),
|
||||
getClass().getName() + "$ExecuteAdvice");
|
||||
}
|
||||
|
||||
public static class ExecuteAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
|
||||
task = newTask;
|
||||
ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(
|
||||
contextStore, newTask, Java8BytecodeBridge.currentContext());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exitJobSubmit(
|
||||
@Advice.Enter State state, @Advice.Thrown Throwable throwable) {
|
||||
ExecutorInstrumentationUtils.cleanUpOnMethodExit(state, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.core;
|
||||
|
||||
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.concurrent.State;
|
||||
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class SpringCoreInstrumentationModule extends InstrumentationModule {
|
||||
public SpringCoreInstrumentationModule() {
|
||||
super("spring-core", "spring-core-2.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
return hasClassesNamed("org.springframework.core.task.SimpleAsyncTaskExecutor");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> contextStore() {
|
||||
return singletonMap(Runnable.class.getName(), State.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new SimpleAsyncTaskExecutorInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
||||
|
||||
import io.opentelemetry.api.OpenTelemetry
|
||||
import io.opentelemetry.instrumentation.test.AgentTestRunner
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Unroll
|
||||
|
||||
class SimpleAsyncTaskExecutorInstrumentationTest extends AgentTestRunner {
|
||||
|
||||
@Shared
|
||||
def executeRunnable = { e, c -> e.execute((Runnable) c) }
|
||||
@Shared
|
||||
def submitRunnable = { e, c -> e.submit((Runnable) c) }
|
||||
@Shared
|
||||
def submitCallable = { e, c -> e.submit((Callable) c) }
|
||||
@Shared
|
||||
def submitListenableRunnable = { e, c -> e.submitListenable((Runnable) c) }
|
||||
@Shared
|
||||
def submitListenableCallable = { e, c -> e.submitListenable((Callable) c) }
|
||||
|
||||
@Unroll
|
||||
def "should propagate context on #desc"() {
|
||||
given:
|
||||
def executor = new SimpleAsyncTaskExecutor()
|
||||
|
||||
when:
|
||||
runUnderTrace("parent") {
|
||||
def child1 = new AsyncTask(startSpan: true)
|
||||
def child2 = new AsyncTask(startSpan: false)
|
||||
method(executor, child1)
|
||||
method(executor, child2)
|
||||
child1.waitForCompletion()
|
||||
child2.waitForCompletion()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
}
|
||||
span(1) {
|
||||
name "asyncChild"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
desc | method
|
||||
"execute Runnable" | executeRunnable
|
||||
"submit Runnable" | submitRunnable
|
||||
"submit Callable" | submitCallable
|
||||
"submitListenable Runnable" | submitListenableRunnable
|
||||
"submitListenable Callable" | submitListenableCallable
|
||||
}
|
||||
}
|
||||
|
||||
class AsyncTask implements Runnable, Callable<Object> {
|
||||
private static final TRACER = OpenTelemetry.getGlobalTracer("io.opentelemetry.auto")
|
||||
|
||||
final latch = new CountDownLatch(1)
|
||||
boolean startSpan
|
||||
|
||||
@Override
|
||||
void run() {
|
||||
if (startSpan) {
|
||||
TRACER.spanBuilder("asyncChild").startSpan().end()
|
||||
}
|
||||
latch.countDown()
|
||||
}
|
||||
|
||||
@Override
|
||||
Object call() throws Exception {
|
||||
run()
|
||||
return null
|
||||
}
|
||||
|
||||
void waitForCompletion() throws InterruptedException {
|
||||
latch.await()
|
||||
}
|
||||
}
|
|
@ -178,6 +178,7 @@ include ':instrumentation:servlet:servlet-common:javaagent'
|
|||
include ':instrumentation:servlet:servlet-2.2:javaagent'
|
||||
include ':instrumentation:servlet:servlet-3.0:javaagent'
|
||||
include ':instrumentation:spark-2.3:javaagent'
|
||||
include ':instrumentation:spring:spring-core-2.0:javaagent'
|
||||
include ':instrumentation:spring:spring-data-1.8:javaagent'
|
||||
include ':instrumentation:spring:spring-scheduling-3.1:javaagent'
|
||||
include ':instrumentation:spring:spring-web-3.1:library'
|
||||
|
|
Loading…
Reference in New Issue