Use Kotlin context element in agent instrumentation (#1618)
* Use SDK helper * Revert unintended.
This commit is contained in:
parent
9802539b07
commit
822be11dbc
|
@ -42,6 +42,7 @@ ext {
|
|||
opentelemetryApi : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-api', version: versions.opentelemetry),
|
||||
opentelemetryAutoAnnotations: dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-extension-annotations', version: versions.opentelemetryAnother),
|
||||
opentelemetryContext : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-context', version: versions.opentelemetryContext),
|
||||
opentelemetryKotlin : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-extension-kotlin', version: versions.opentelemetry),
|
||||
opentelemetryTraceProps : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-extension-trace-propagators', version: versions.opentelemetry),
|
||||
opentelemetrySdk : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: versions.opentelemetryAnother),
|
||||
opentelemetryJaeger : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-exporter-jaeger', version: versions.opentelemetryOther),
|
||||
|
|
|
@ -16,7 +16,7 @@ muzzle {
|
|||
}
|
||||
}
|
||||
dependencies {
|
||||
implementation project(':instrumentation:executors')
|
||||
implementation deps.opentelemetryKotlin
|
||||
|
||||
compileOnly 'org.jetbrains.kotlin:kotlin-stdlib-common:1.3.72'
|
||||
compileOnly 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.extension.kotlin.ContextExtensionsKt;
|
||||
import kotlin.coroutines.CoroutineContext;
|
||||
|
||||
public final class KotlinCoroutinesInstrumentationHelper {
|
||||
|
||||
public static CoroutineContext addOpenTelemetryContext(CoroutineContext coroutineContext) {
|
||||
Context current = Context.current();
|
||||
Context inCoroutine = ContextExtensionsKt.getOpenTelemetryContext(coroutineContext);
|
||||
if (current == inCoroutine) {
|
||||
return coroutineContext;
|
||||
}
|
||||
return coroutineContext.plus(ContextExtensionsKt.asContextElement(current));
|
||||
}
|
||||
|
||||
private KotlinCoroutinesInstrumentationHelper() {}
|
||||
}
|
|
@ -7,50 +7,22 @@ package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;
|
|||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import kotlin.coroutines.Continuation;
|
||||
import kotlin.coroutines.CoroutineContext;
|
||||
import kotlin.jvm.functions.Function2;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class KotlinCoroutinesInstrumentationModule extends InstrumentationModule {
|
||||
/*
|
||||
Kotlin coroutines with suspend functions are a form of cooperative "userland" threading
|
||||
(you might also know this pattern as "fibers" or "green threading", where the OS/kernel-level thread
|
||||
has no idea of switching between tasks. Fortunately kotlin exposes hooks for the key events: knowing when
|
||||
coroutines are being created, when they are suspended (swapped out/inactive), and when they are resumed (about to
|
||||
run again).
|
||||
|
||||
Without this instrumentation, heavy concurrency and usage of kotlin suspend functions will break causality
|
||||
and cause nonsensical span parents/context propagation. This is because a single JVM thread will run a series of
|
||||
coroutines in an "arbitrary" order, and a context set by coroutine A (which then gets suspended) will be picked up
|
||||
by completely-unrelated coroutine B.
|
||||
|
||||
The basic strategy here is:
|
||||
1) Use the DebugProbes callbacks to learn about coroutine create, resume, and suspend operations
|
||||
2) Wrap the creation Coroutine and its Context and use that wrapping to add an extra Context "key"
|
||||
3) Use the callback for resume and suspend to manipulate our context "key" whereby an appropriate state
|
||||
object can be found (tied to the chain of Continutations in the Coroutine).
|
||||
4) Do our swapping-context dance with that appropriate state
|
||||
5) Test with highly concurrent well-known span causality and ensure everything looks right.
|
||||
Without this instrumentation, this test fails with concurrency=2; with this instrumentation,
|
||||
it passes with concurrency=200.
|
||||
*/
|
||||
|
||||
public KotlinCoroutinesInstrumentationModule() {
|
||||
super("kotlinx-coroutines");
|
||||
|
@ -58,149 +30,45 @@ public class KotlinCoroutinesInstrumentationModule extends InstrumentationModule
|
|||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new KotlinDebugProbeInstrumentation());
|
||||
return singletonList(new CoroutineScopeLaunchInstrumentation());
|
||||
}
|
||||
|
||||
private static final class KotlinDebugProbeInstrumentation implements TypeInstrumentation {
|
||||
private static final class CoroutineScopeLaunchInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<? super TypeDescription> typeMatcher() {
|
||||
return named("kotlin.coroutines.jvm.internal.DebugProbesKt");
|
||||
return named("kotlinx.coroutines.BuildersKt");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("probeCoroutineCreated").and(takesArguments(1)),
|
||||
CoroutineCreatedAdvice.class.getName());
|
||||
(named("launch").or(named("launch$default")))
|
||||
.and(takesArgument(1, named("kotlin.coroutines.CoroutineContext"))),
|
||||
KotlinCoroutinesInstrumentationModule.class.getName() + "$LaunchAdvice");
|
||||
transformers.put(
|
||||
named("probeCoroutineResumed").and(takesArguments(1)),
|
||||
CoroutineResumedAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("probeCoroutineSuspended").and(takesArguments(1)),
|
||||
CoroutineSuspendedAdvice.class.getName());
|
||||
(named("runBlocking").or(named("runBlocking$default")))
|
||||
.and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))),
|
||||
KotlinCoroutinesInstrumentationModule.class.getName() + "$RunBlockingAdvice");
|
||||
return transformers;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineCreatedAdvice {
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Return(readOnly = false) Continuation<?> retVal) {
|
||||
if (!(retVal instanceof CoroutineWrapper)) {
|
||||
retVal = new CoroutineWrapper<>(retVal);
|
||||
}
|
||||
public static class LaunchAdvice {
|
||||
@Advice.OnMethodEnter
|
||||
public static void enter(
|
||||
@Advice.Argument(value = 1, readOnly = false) CoroutineContext coroutineContext) {
|
||||
coroutineContext =
|
||||
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineResumedAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void enter(@Advice.Argument(0) final Continuation<?> continuation) {
|
||||
CoroutineContextWrapper w = continuation.getContext().get(TraceScopeKey.INSTANCE);
|
||||
if (w != null) {
|
||||
w.tracingResume();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineSuspendedAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void enter(@Advice.Argument(0) final Continuation<?> continuation) {
|
||||
CoroutineContextWrapper w = continuation.getContext().get(TraceScopeKey.INSTANCE);
|
||||
if (w != null) {
|
||||
w.tracingSuspend();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TraceScopeKey implements CoroutineContext.Key<CoroutineContextWrapper> {
|
||||
public static final TraceScopeKey INSTANCE = new TraceScopeKey();
|
||||
}
|
||||
|
||||
public static class CoroutineWrapper<T> implements Continuation<T> {
|
||||
private final Continuation<T> proxy;
|
||||
private final CoroutineContextWrapper contextWrapper;
|
||||
|
||||
public CoroutineWrapper(Continuation<T> proxy) {
|
||||
this.proxy = proxy;
|
||||
this.contextWrapper = new CoroutineContextWrapper(proxy.getContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return proxy.toString();
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CoroutineContext getContext() {
|
||||
return contextWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWith(@NotNull Object o) {
|
||||
proxy.resumeWith(o);
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineContextWrapper
|
||||
implements CoroutineContext, CoroutineContext.Element {
|
||||
private final CoroutineContext proxy;
|
||||
private Context myTracingContext;
|
||||
private Scope scope;
|
||||
|
||||
public CoroutineContextWrapper(CoroutineContext proxy) {
|
||||
this.proxy = proxy;
|
||||
this.myTracingContext = Context.current();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> R fold(R r, @NotNull Function2<? super R, ? super Element, ? extends R> function2) {
|
||||
return proxy.fold(r, function2);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <E extends Element> E get(@NotNull Key<E> key) {
|
||||
if (key == TraceScopeKey.INSTANCE) {
|
||||
return (E) this;
|
||||
}
|
||||
return proxy.get(key);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CoroutineContext minusKey(@NotNull Key<?> key) {
|
||||
// I can't be removed!
|
||||
return proxy.minusKey(key);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CoroutineContext plus(@NotNull CoroutineContext coroutineContext) {
|
||||
return proxy.plus(coroutineContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return proxy.toString();
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public Key<?> getKey() {
|
||||
return TraceScopeKey.INSTANCE;
|
||||
}
|
||||
|
||||
// Actual tracing context-switch logic
|
||||
public void tracingSuspend() {
|
||||
// TODO(anuraaga): Investigate why test passes only with this call here. Conceptually it seems
|
||||
// weird to overwrite current context like this.
|
||||
myTracingContext = Context.current();
|
||||
scope.close();
|
||||
}
|
||||
|
||||
public void tracingResume() {
|
||||
scope = myTracingContext.makeCurrent();
|
||||
public static class RunBlockingAdvice {
|
||||
@Advice.OnMethodEnter
|
||||
public static void enter(
|
||||
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) {
|
||||
coroutineContext =
|
||||
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
*/
|
||||
|
||||
import io.opentelemetry.api.trace.Tracer
|
||||
import io.opentelemetry.extension.kotlin.asContextElement
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
|
@ -21,6 +22,7 @@ import kotlinx.coroutines.delay
|
|||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.selects.select
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import kotlinx.coroutines.yield
|
||||
|
||||
|
@ -145,35 +147,35 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
|
|||
suspend fun a(iter: Long) {
|
||||
var span = tracer.spanBuilder("a").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = span.makeCurrent()
|
||||
delay(10)
|
||||
a2(iter)
|
||||
scope.close()
|
||||
withContext(span.asContextElement()) {
|
||||
delay(10)
|
||||
a2(iter)
|
||||
}
|
||||
span.end()
|
||||
}
|
||||
suspend fun a2(iter: Long) {
|
||||
var span = tracer.spanBuilder("a2").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = span.makeCurrent()
|
||||
delay(10)
|
||||
scope.close()
|
||||
withContext(span.asContextElement()) {
|
||||
delay(10)
|
||||
}
|
||||
span.end()
|
||||
}
|
||||
suspend fun b(iter: Long) {
|
||||
var span = tracer.spanBuilder("b").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = span.makeCurrent()
|
||||
delay(10)
|
||||
b2(iter)
|
||||
scope.close()
|
||||
withContext(span.asContextElement()) {
|
||||
delay(10)
|
||||
b2(iter)
|
||||
}
|
||||
span.end()
|
||||
}
|
||||
suspend fun b2(iter: Long) {
|
||||
var span = tracer.spanBuilder("b2").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = span.makeCurrent()
|
||||
delay(10)
|
||||
scope.close()
|
||||
withContext(span.asContextElement()) {
|
||||
delay(10)
|
||||
}
|
||||
span.end()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue