diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 70e94a2e1c..6b7f6f62e3 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -42,6 +42,7 @@ ext { opentelemetryApi : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-api', version: versions.opentelemetry), opentelemetryAutoAnnotations: dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-extension-annotations', version: versions.opentelemetryAnother), opentelemetryContext : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-context', version: versions.opentelemetryContext), + opentelemetryKotlin : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-extension-kotlin', version: versions.opentelemetry), opentelemetryTraceProps : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-extension-trace-propagators', version: versions.opentelemetry), opentelemetrySdk : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: versions.opentelemetryAnother), opentelemetryJaeger : dependencies.create(group: 'io.opentelemetry', name: 'opentelemetry-exporter-jaeger', version: versions.opentelemetryOther), diff --git a/instrumentation/kotlinx-coroutines/kotlinx-coroutines.gradle b/instrumentation/kotlinx-coroutines/kotlinx-coroutines.gradle index c2969f02a3..22076db7b4 100644 --- a/instrumentation/kotlinx-coroutines/kotlinx-coroutines.gradle +++ b/instrumentation/kotlinx-coroutines/kotlinx-coroutines.gradle @@ -16,7 +16,7 @@ muzzle { } } dependencies { - implementation project(':instrumentation:executors') + implementation deps.opentelemetryKotlin compileOnly 'org.jetbrains.kotlin:kotlin-stdlib-common:1.3.72' compileOnly 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7' diff --git a/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationHelper.java b/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationHelper.java new file mode 100644 index 0000000000..0f5f8e3a08 --- /dev/null +++ b/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationHelper.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines; + +import io.opentelemetry.context.Context; +import io.opentelemetry.extension.kotlin.ContextExtensionsKt; +import kotlin.coroutines.CoroutineContext; + +public final class KotlinCoroutinesInstrumentationHelper { + + public static CoroutineContext addOpenTelemetryContext(CoroutineContext coroutineContext) { + Context current = Context.current(); + Context inCoroutine = ContextExtensionsKt.getOpenTelemetryContext(coroutineContext); + if (current == inCoroutine) { + return coroutineContext; + } + return coroutineContext.plus(ContextExtensionsKt.asContextElement(current)); + } + + private KotlinCoroutinesInstrumentationHelper() {} +} diff --git a/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java b/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java index a4aa96e74a..2b3ebe0eb7 100644 --- a/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java +++ b/instrumentation/kotlinx-coroutines/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java @@ -7,50 +7,22 @@ package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines; import static java.util.Collections.singletonList; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.tooling.InstrumentationModule; import io.opentelemetry.javaagent.tooling.TypeInstrumentation; import java.util.HashMap; import java.util.List; import java.util.Map; -import kotlin.coroutines.Continuation; import kotlin.coroutines.CoroutineContext; -import kotlin.jvm.functions.Function2; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; @AutoService(InstrumentationModule.class) public class KotlinCoroutinesInstrumentationModule extends InstrumentationModule { - /* - Kotlin coroutines with suspend functions are a form of cooperative "userland" threading - (you might also know this pattern as "fibers" or "green threading", where the OS/kernel-level thread - has no idea of switching between tasks. Fortunately kotlin exposes hooks for the key events: knowing when - coroutines are being created, when they are suspended (swapped out/inactive), and when they are resumed (about to - run again). - - Without this instrumentation, heavy concurrency and usage of kotlin suspend functions will break causality - and cause nonsensical span parents/context propagation. This is because a single JVM thread will run a series of - coroutines in an "arbitrary" order, and a context set by coroutine A (which then gets suspended) will be picked up - by completely-unrelated coroutine B. - - The basic strategy here is: - 1) Use the DebugProbes callbacks to learn about coroutine create, resume, and suspend operations - 2) Wrap the creation Coroutine and its Context and use that wrapping to add an extra Context "key" - 3) Use the callback for resume and suspend to manipulate our context "key" whereby an appropriate state - object can be found (tied to the chain of Continutations in the Coroutine). - 4) Do our swapping-context dance with that appropriate state - 5) Test with highly concurrent well-known span causality and ensure everything looks right. - Without this instrumentation, this test fails with concurrency=2; with this instrumentation, - it passes with concurrency=200. - */ public KotlinCoroutinesInstrumentationModule() { super("kotlinx-coroutines"); @@ -58,149 +30,45 @@ public class KotlinCoroutinesInstrumentationModule extends InstrumentationModule @Override public List typeInstrumentations() { - return singletonList(new KotlinDebugProbeInstrumentation()); + return singletonList(new CoroutineScopeLaunchInstrumentation()); } - private static final class KotlinDebugProbeInstrumentation implements TypeInstrumentation { + private static final class CoroutineScopeLaunchInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("kotlin.coroutines.jvm.internal.DebugProbesKt"); + return named("kotlinx.coroutines.BuildersKt"); } @Override public Map, String> transformers() { final Map, String> transformers = new HashMap<>(); transformers.put( - named("probeCoroutineCreated").and(takesArguments(1)), - CoroutineCreatedAdvice.class.getName()); + (named("launch").or(named("launch$default"))) + .and(takesArgument(1, named("kotlin.coroutines.CoroutineContext"))), + KotlinCoroutinesInstrumentationModule.class.getName() + "$LaunchAdvice"); transformers.put( - named("probeCoroutineResumed").and(takesArguments(1)), - CoroutineResumedAdvice.class.getName()); - transformers.put( - named("probeCoroutineSuspended").and(takesArguments(1)), - CoroutineSuspendedAdvice.class.getName()); + (named("runBlocking").or(named("runBlocking$default"))) + .and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))), + KotlinCoroutinesInstrumentationModule.class.getName() + "$RunBlockingAdvice"); return transformers; } } - public static class CoroutineCreatedAdvice { - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void exit(@Advice.Return(readOnly = false) Continuation retVal) { - if (!(retVal instanceof CoroutineWrapper)) { - retVal = new CoroutineWrapper<>(retVal); - } + public static class LaunchAdvice { + @Advice.OnMethodEnter + public static void enter( + @Advice.Argument(value = 1, readOnly = false) CoroutineContext coroutineContext) { + coroutineContext = + KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext); } } - public static class CoroutineResumedAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void enter(@Advice.Argument(0) final Continuation continuation) { - CoroutineContextWrapper w = continuation.getContext().get(TraceScopeKey.INSTANCE); - if (w != null) { - w.tracingResume(); - } - } - } - - public static class CoroutineSuspendedAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void enter(@Advice.Argument(0) final Continuation continuation) { - CoroutineContextWrapper w = continuation.getContext().get(TraceScopeKey.INSTANCE); - if (w != null) { - w.tracingSuspend(); - } - } - } - - public static class TraceScopeKey implements CoroutineContext.Key { - public static final TraceScopeKey INSTANCE = new TraceScopeKey(); - } - - public static class CoroutineWrapper implements Continuation { - private final Continuation proxy; - private final CoroutineContextWrapper contextWrapper; - - public CoroutineWrapper(Continuation proxy) { - this.proxy = proxy; - this.contextWrapper = new CoroutineContextWrapper(proxy.getContext()); - } - - @Override - public String toString() { - return proxy.toString(); - } - - @NotNull - @Override - public CoroutineContext getContext() { - return contextWrapper; - } - - @Override - public void resumeWith(@NotNull Object o) { - proxy.resumeWith(o); - } - } - - public static class CoroutineContextWrapper - implements CoroutineContext, CoroutineContext.Element { - private final CoroutineContext proxy; - private Context myTracingContext; - private Scope scope; - - public CoroutineContextWrapper(CoroutineContext proxy) { - this.proxy = proxy; - this.myTracingContext = Context.current(); - } - - @Override - public R fold(R r, @NotNull Function2 function2) { - return proxy.fold(r, function2); - } - - @Nullable - @Override - public E get(@NotNull Key key) { - if (key == TraceScopeKey.INSTANCE) { - return (E) this; - } - return proxy.get(key); - } - - @NotNull - @Override - public CoroutineContext minusKey(@NotNull Key key) { - // I can't be removed! - return proxy.minusKey(key); - } - - @NotNull - @Override - public CoroutineContext plus(@NotNull CoroutineContext coroutineContext) { - return proxy.plus(coroutineContext); - } - - @Override - public String toString() { - return proxy.toString(); - } - - @NotNull - @Override - public Key getKey() { - return TraceScopeKey.INSTANCE; - } - - // Actual tracing context-switch logic - public void tracingSuspend() { - // TODO(anuraaga): Investigate why test passes only with this call here. Conceptually it seems - // weird to overwrite current context like this. - myTracingContext = Context.current(); - scope.close(); - } - - public void tracingResume() { - scope = myTracingContext.makeCurrent(); + public static class RunBlockingAdvice { + @Advice.OnMethodEnter + public static void enter( + @Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) { + coroutineContext = + KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext); } } } diff --git a/instrumentation/kotlinx-coroutines/src/test/kotlin/KotlinCoroutineTests.kt b/instrumentation/kotlinx-coroutines/src/test/kotlin/KotlinCoroutineTests.kt index 9fab165276..290e6e4785 100644 --- a/instrumentation/kotlinx-coroutines/src/test/kotlin/KotlinCoroutineTests.kt +++ b/instrumentation/kotlinx-coroutines/src/test/kotlin/KotlinCoroutineTests.kt @@ -4,6 +4,7 @@ */ import io.opentelemetry.api.trace.Tracer +import io.opentelemetry.extension.kotlin.asContextElement import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge import java.util.concurrent.TimeUnit import kotlinx.coroutines.CompletableDeferred @@ -21,6 +22,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.selects.select +import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout import kotlinx.coroutines.yield @@ -145,35 +147,35 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) { suspend fun a(iter: Long) { var span = tracer.spanBuilder("a").startSpan() span.setAttribute("iter", iter) - var scope = span.makeCurrent() - delay(10) - a2(iter) - scope.close() + withContext(span.asContextElement()) { + delay(10) + a2(iter) + } span.end() } suspend fun a2(iter: Long) { var span = tracer.spanBuilder("a2").startSpan() span.setAttribute("iter", iter) - var scope = span.makeCurrent() - delay(10) - scope.close() + withContext(span.asContextElement()) { + delay(10) + } span.end() } suspend fun b(iter: Long) { var span = tracer.spanBuilder("b").startSpan() span.setAttribute("iter", iter) - var scope = span.makeCurrent() - delay(10) - b2(iter) - scope.close() + withContext(span.asContextElement()) { + delay(10) + b2(iter) + } span.end() } suspend fun b2(iter: Long) { var span = tracer.spanBuilder("b2").startSpan() span.setAttribute("iter", iter) - var scope = span.makeCurrent() - delay(10) - scope.close() + withContext(span.asContextElement()) { + delay(10) + } span.end() }