Properly track causality of kotlin suspend functions (#1168)
* Initial work on kotlin suspend functions. Introduce new instrumentation to capture kotlin suspended coroutines and track/update Context appropriately. Add high-concurrency test showing the issue and that it is solved. * spotlessApply * Fix spelling of continuation. * Fix kotlin style * Add kotlin dependencies to muzzle * Fix muzzle in another package that pulls in java-concurrent. * Use runBlocking instead of a sleep to finish test * Clarify and improve comments on how/why * Move kotlin coroutines instrumentation to top-level instrumentation module * Properly build muzzle check for kotlin-coroutines * Merge kotlin-testing into regular src/test rather than separate module * Clean up attach/detach logic * Tighten bounds and skipVersions on kotlin muzzle * Rename fields and clean up attach logic * Clean up types/side-effect code * spotlessApply * Enhance muzzle testing to properly test latest kotlin. Required updating a few kotlin-y things to latest version to properly resolve the jvm variant of their new multiplatform stuff.
This commit is contained in:
parent
70a06ef086
commit
030e9fe107
|
@ -25,7 +25,7 @@ ext {
|
|||
logback : "1.2.3",
|
||||
bytebuddy : "1.10.10",
|
||||
scala : "2.11.12", // Last version to support Java 7 (2.12+ require Java 8+)
|
||||
kotlin : "1.3.72",
|
||||
kotlin : "1.4.0",
|
||||
coroutines : "1.3.0",
|
||||
springboot : "2.3.1.RELEASE",
|
||||
// TODO(anuraaga): Switch off of milestones, this version fixes compatibility with Spock Unroll
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// Enable testing kotlin code in groovy spock tests.
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'org.jetbrains.kotlin.jvm'
|
||||
|
||||
compileTestGroovy {
|
||||
//Note: look like it should be `classpath += files(sourceSets.test.kotlin.classesDirectory)`
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
ext.skipPublish = true
|
||||
apply from: "$rootDir/gradle/instrumentation.gradle"
|
||||
apply from: "$rootDir/gradle/test-with-kotlin.gradle"
|
||||
|
||||
dependencies {
|
||||
testImplementation deps.kotlin
|
||||
testImplementation deps.coroutines
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
ext {
|
||||
// TODO (trask) currently this is only needed for LambdaGen
|
||||
// revisit if we don't end up dropping Java 7
|
||||
minJavaVersionForTests = JavaVersion.VERSION_1_8
|
||||
}
|
||||
|
||||
apply from: "$rootDir/gradle/instrumentation.gradle"
|
||||
apply from: "$rootDir/gradle/test-with-kotlin.gradle"
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group = 'org.jetbrains.kotlinx'
|
||||
module = 'kotlinx-coroutines-core'
|
||||
versions = "[1.0.0,1.3.8)"
|
||||
skipVersions += ['1.3.2-js-ir-01', '1.3.2-js-ir-02']
|
||||
}
|
||||
// 1.3.9 (and beyond?) have changed how artifact names are resolved due to multiplatform variants
|
||||
pass {
|
||||
group = 'org.jetbrains.kotlinx'
|
||||
module = 'kotlinx-coroutines-core-jvm'
|
||||
versions = "[1.3.9,)"
|
||||
}
|
||||
}
|
||||
dependencies {
|
||||
implementation project(':instrumentation:java-concurrent')
|
||||
|
||||
compileOnly 'org.jetbrains.kotlin:kotlin-stdlib-common:1.3.72'
|
||||
compileOnly 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
|
||||
|
||||
testImplementation deps.kotlin
|
||||
testImplementation deps.coroutines
|
||||
}
|
|
@ -0,0 +1,214 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.auto.kotlincoroutines;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.grpc.Context;
|
||||
import io.opentelemetry.javaagent.tooling.Instrumenter;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import kotlin.coroutines.Continuation;
|
||||
import kotlin.coroutines.CoroutineContext;
|
||||
import kotlin.jvm.functions.Function2;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@AutoService(Instrumenter.class)
|
||||
public class KotlinProbeInstrumentation extends Instrumenter.Default {
|
||||
/*
|
||||
Kotlin coroutines with suspend functions are a form of cooperative "userland" threading
|
||||
(you might also know this pattern as "fibers" or "green threading", where the OS/kernel-level thread
|
||||
has no idea of switching between tasks. Fortunately kotlin exposes hooks for the key events: knowing when
|
||||
coroutines are being created, when they are suspended (swapped out/inactive), and when they are resumed (about to
|
||||
run again).
|
||||
|
||||
Without this instrumentation, heavy concurrency and usage of kotlin suspend functions will break causality
|
||||
and cause nonsensical span parents/context propagation. This is because a single JVM thread will run a series of
|
||||
coroutines in an "arbitrary" order, and a context set by coroutine A (which then gets suspended) will be picked up
|
||||
by completely-unrelated coroutine B.
|
||||
|
||||
The basic strategy here is:
|
||||
1) Use the DebugProbes callbacks to learn about coroutine create, resume, and suspend operations
|
||||
2) Wrap the creation Coroutine and its Context and use that wrapping to add an extra Context "key"
|
||||
3) Use the callback for resume and suspend to manipulate our context "key" whereby an appropriate state
|
||||
object can be found (tied to the chain of Continutations in the Coroutine).
|
||||
4) Do our swapping-context dance with that appropriate state
|
||||
5) Test with highly concurrent well-known span causality and ensure everything looks right.
|
||||
Without this instrumentation, this test fails with concurrency=2; with this instrumentation,
|
||||
it passes with concurrency=200.
|
||||
*/
|
||||
|
||||
public KotlinProbeInstrumentation() {
|
||||
super("kotlin-coroutines");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<? super TypeDescription> typeMatcher() {
|
||||
return named("kotlin.coroutines.jvm.internal.DebugProbesKt");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
|
||||
final Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
|
||||
transformers.put(
|
||||
named("probeCoroutineCreated").and(takesArguments(1)),
|
||||
CoroutineCreatedAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("probeCoroutineResumed").and(takesArguments(1)),
|
||||
CoroutineResumedAdvice.class.getName());
|
||||
transformers.put(
|
||||
named("probeCoroutineSuspended").and(takesArguments(1)),
|
||||
CoroutineSuspendedAdvice.class.getName());
|
||||
return transformers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
"io.opentelemetry.instrumentation.auto.kotlincoroutines.KotlinProbeInstrumentation$CoroutineWrapper",
|
||||
"io.opentelemetry.instrumentation.auto.kotlincoroutines.KotlinProbeInstrumentation$TraceScopeKey",
|
||||
"io.opentelemetry.instrumentation.auto.kotlincoroutines.KotlinProbeInstrumentation$CoroutineContextWrapper",
|
||||
};
|
||||
}
|
||||
|
||||
public static class CoroutineCreatedAdvice {
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(
|
||||
@Advice.Return(readOnly = false) kotlin.coroutines.Continuation retVal) {
|
||||
if (!(retVal instanceof CoroutineWrapper)) {
|
||||
retVal = new CoroutineWrapper(retVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineResumedAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void enter(
|
||||
@Advice.Argument(0) final kotlin.coroutines.Continuation continuation) {
|
||||
CoroutineContextWrapper w = continuation.getContext().get(TraceScopeKey.INSTANCE);
|
||||
if (w != null) {
|
||||
w.tracingResume();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineSuspendedAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void enter(
|
||||
@Advice.Argument(0) final kotlin.coroutines.Continuation continuation) {
|
||||
CoroutineContextWrapper w = continuation.getContext().get(TraceScopeKey.INSTANCE);
|
||||
if (w != null) {
|
||||
w.tracingSuspend();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TraceScopeKey implements CoroutineContext.Key<CoroutineContextWrapper> {
|
||||
public static final TraceScopeKey INSTANCE = new TraceScopeKey();
|
||||
}
|
||||
|
||||
public static class CoroutineWrapper implements kotlin.coroutines.Continuation {
|
||||
private final Continuation proxy;
|
||||
private final CoroutineContextWrapper contextWrapper;
|
||||
|
||||
public CoroutineWrapper(Continuation proxy) {
|
||||
this.proxy = proxy;
|
||||
this.contextWrapper = new CoroutineContextWrapper(proxy.getContext());
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return proxy.toString();
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CoroutineContext getContext() {
|
||||
return contextWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWith(@NotNull Object o) {
|
||||
proxy.resumeWith(o);
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoroutineContextWrapper
|
||||
implements CoroutineContext, CoroutineContext.Element {
|
||||
private final CoroutineContext proxy;
|
||||
private Context myTracingContext;
|
||||
private Context prevTracingContext;
|
||||
|
||||
public CoroutineContextWrapper(CoroutineContext proxy) {
|
||||
this.proxy = proxy;
|
||||
this.myTracingContext = Context.current();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> R fold(R r, @NotNull Function2<? super R, ? super Element, ? extends R> function2) {
|
||||
return proxy.fold(r, function2);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <E extends Element> E get(@NotNull Key<E> key) {
|
||||
if (key == TraceScopeKey.INSTANCE) {
|
||||
return (E) this;
|
||||
}
|
||||
return proxy.get(key);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CoroutineContext minusKey(@NotNull Key<?> key) {
|
||||
// I can't be removed!
|
||||
return proxy.minusKey(key);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CoroutineContext plus(@NotNull CoroutineContext coroutineContext) {
|
||||
return proxy.plus(coroutineContext);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return proxy.toString();
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public Key<?> getKey() {
|
||||
return TraceScopeKey.INSTANCE;
|
||||
}
|
||||
|
||||
// Actual tracing context-switch logic
|
||||
public void tracingSuspend() {
|
||||
myTracingContext = Context.current();
|
||||
myTracingContext.detach(prevTracingContext);
|
||||
}
|
||||
|
||||
public void tracingResume() {
|
||||
prevTracingContext = myTracingContext.attach();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -194,4 +194,44 @@ class KotlinCoroutineInstrumentationTest extends AgentTestRunner {
|
|||
where:
|
||||
dispatcher << dispatchersToTest
|
||||
}
|
||||
def "test concurrent suspend functions"() {
|
||||
setup:
|
||||
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default)
|
||||
int numIters = 100
|
||||
HashSet<Long> seenItersA = new HashSet<>()
|
||||
HashSet<Long> seenItersB = new HashSet<>()
|
||||
HashSet<Long> expectedIters = new HashSet<>((0L..(numIters-1)).toList())
|
||||
|
||||
when:
|
||||
kotlinTest.launchConcurrentSuspendFunctions(numIters)
|
||||
|
||||
then:
|
||||
// This generates numIters each of "a calls a2" and "b calls b2" traces. Each
|
||||
// trace should have a single pair of spans (a and a2) and each of those spans
|
||||
// should have the same iteration number (attribute "iter").
|
||||
// The traces are in some random order, so let's keep track and make sure we see
|
||||
// each iteration # exactly once
|
||||
assertTraces(numIters*2) {
|
||||
for(int i=0; i < numIters*2; i++) {
|
||||
trace(i, 2) {
|
||||
boolean a = false
|
||||
long iter = -1
|
||||
span(0) {
|
||||
a = span.name.matches("a")
|
||||
iter = span.getAttributes().get("iter").getLongValue()
|
||||
(a ? seenItersA : seenItersB).add(iter)
|
||||
operationName(a ? "a" : "b")
|
||||
}
|
||||
span(1) {
|
||||
operationName(a ? "a2" : "b2")
|
||||
childOf(span(0))
|
||||
assert span.getAttributes().get("iter").getLongValue() == iter
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assert seenItersA.equals(expectedIters)
|
||||
assert seenItersB.equals(expectedIters)
|
||||
}
|
||||
}
|
|
@ -22,12 +22,14 @@ import kotlinx.coroutines.CompletableDeferred
|
|||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.CoroutineStart
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.awaitAll
|
||||
import kotlinx.coroutines.channels.actor
|
||||
import kotlinx.coroutines.channels.consumeEach
|
||||
import kotlinx.coroutines.channels.produce
|
||||
import kotlinx.coroutines.channels.toChannel
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.selects.select
|
||||
|
@ -138,6 +140,54 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
|
|||
}
|
||||
}
|
||||
|
||||
fun launchConcurrentSuspendFunctions(numIters: Int) {
|
||||
runBlocking {
|
||||
for (i in 0 until numIters) {
|
||||
GlobalScope.launch {
|
||||
a(i.toLong())
|
||||
}
|
||||
GlobalScope.launch {
|
||||
b(i.toLong())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun a(iter: Long) {
|
||||
var span = tracer.spanBuilder("a").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = currentContextWith(span)
|
||||
delay(10)
|
||||
a2(iter)
|
||||
scope.close()
|
||||
span.end()
|
||||
}
|
||||
suspend fun a2(iter: Long) {
|
||||
var span = tracer.spanBuilder("a2").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = currentContextWith(span)
|
||||
delay(10)
|
||||
scope.close()
|
||||
span.end()
|
||||
}
|
||||
suspend fun b(iter: Long) {
|
||||
var span = tracer.spanBuilder("b").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = currentContextWith(span)
|
||||
delay(10)
|
||||
b2(iter)
|
||||
scope.close()
|
||||
span.end()
|
||||
}
|
||||
suspend fun b2(iter: Long) {
|
||||
var span = tracer.spanBuilder("b2").startSpan()
|
||||
span.setAttribute("iter", iter)
|
||||
var scope = currentContextWith(span)
|
||||
delay(10)
|
||||
scope.close()
|
||||
span.end()
|
||||
}
|
||||
|
||||
fun tracedChild(opName: String) {
|
||||
tracer.spanBuilder(opName).startSpan().end()
|
||||
}
|
|
@ -313,6 +313,9 @@ public class AdditionalLibraryIgnoresMatcher<T extends TypeDescription>
|
|||
|
||||
// kotlin, note we do not ignore kotlinx because we instrument coroutins code
|
||||
if (name.startsWith("kotlin.")) {
|
||||
if (name.equals("kotlin.coroutines.jvm.internal.DebugProbesKt")) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -97,7 +97,6 @@ include ':instrumentation:java-classloader:jboss-testing'
|
|||
include ':instrumentation:java-classloader:osgi-testing'
|
||||
include ':instrumentation:java-classloader:tomcat-testing'
|
||||
include ':instrumentation:java-concurrent'
|
||||
include ':instrumentation:java-concurrent:kotlin-testing'
|
||||
include ':instrumentation:jaxrs:jaxrs-1.0'
|
||||
include ':instrumentation:jaxrs:jaxrs-2.0'
|
||||
include ':instrumentation:jaxrs:jaxrs-2.0:jaxrs-2.0-jersey-2.0'
|
||||
|
@ -116,6 +115,7 @@ include ':instrumentation:jsp-2.3'
|
|||
include ':instrumentation:kafka-clients-0.11'
|
||||
include ':instrumentation:kafka-streams-0.11'
|
||||
include ':instrumentation:khttp-0.1'
|
||||
include ':instrumentation:kotlin-coroutines'
|
||||
include ':instrumentation:kubernetes-client-7.0'
|
||||
include ':instrumentation:lettuce:lettuce-4.0'
|
||||
include ':instrumentation:lettuce:lettuce-5.0'
|
||||
|
|
Loading…
Reference in New Issue