add support for instrumenting kotlin coroutine schedulers

This commit is contained in:
Marco Ferrer 2019-01-09 19:04:42 -05:00
parent c1ae0a8629
commit a31abc67e9
8 changed files with 213 additions and 2 deletions

View File

@ -2,6 +2,7 @@
buildscript {
dependencies {
classpath "net.bytebuddy:byte-buddy-gradle-plugin:${versions.bytebuddy}"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${versions.kotlin}"
}
}
plugins {
@ -17,7 +18,7 @@ subprojects { subProj ->
subProj.byteBuddy {
transformation {
// Applying NoOp optimizes build by applying bytebuddy plugin to only compileJava task
tasks = ['compileJava', 'compileScala']
tasks = ['compileJava', 'compileScala', 'compileKotlin']
plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin$NoOp'
}
}
@ -25,7 +26,7 @@ subprojects { subProj ->
subProj.afterEvaluate {
subProj.byteBuddy {
transformation {
tasks = ['compileJava', 'compileScala']
tasks = ['compileJava', 'compileScala', 'compileKotlin']
plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin'
classPath = project(':dd-java-agent:agent-tooling').configurations.instrumentationMuzzle + subProj.configurations.compile + subProj.sourceSets.main.output
}

View File

@ -0,0 +1,13 @@
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-kotlin.gradle"
dependencies {
testCompile project(':dd-trace-api')
testCompile project(':dd-trace-ot')
testCompile deps.kotlin
testCompile deps.coroutines
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
}

View File

@ -0,0 +1,83 @@
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ThreadPoolDispatcherKt
class KotlinCoroutineInstrumentationTest extends AgentTestRunner {
static def dispatchersToTest = [
Dispatchers.Default,
Dispatchers.IO,
Dispatchers.Unconfined,
ThreadPoolDispatcherKt.newFixedThreadPoolContext(2,"Fixed-Thread-Pool"),
ThreadPoolDispatcherKt.newSingleThreadContext("Single-Thread"),
]
// Test delay
// Test channel
// Test cooperative cancellation
// Test parallel decomposition
// EventLoop?
def "kotlin propagates across nested jobs"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)
int expectedNumberOfSpans = kotlinTest.tracedAcrossThreadsWithNested()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "KotlinCoroutineTests.tracedAcrossThreadsWithNested"
findSpan(trace, "nested").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
def "kotlin either deferred completion"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default)
int expectedNumberOfSpans = kotlinTest.traceWithDeferred()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
trace[0].operationName == "KotlinCoroutineTests.traceWithDeferred"
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
def "kotlin first completed deferred"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default)
int expectedNumberOfSpans = kotlinTest.tracedWithDeferredFirstCompletions()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
private DDSpan findSpan(List<DDSpan> trace, String opName) {
for (DDSpan span : trace) {
if (span.getOperationName() == opName) {
return span
}
}
return null
}
}

View File

@ -0,0 +1,98 @@
import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import io.opentracing.Scope
import io.opentracing.util.GlobalTracer
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.select
import java.util.concurrent.TimeUnit
class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
@Trace
fun tracedAcrossThreadsWithNested(): Int = runTest {
val goodDeferred = async { 1 }
launch {
goodDeferred.await()
launch { tracedChild("nested") }
}
2
}
@Trace
fun traceWithDeferred(): Int = runTest {
val keptPromise = CompletableDeferred<Boolean>()
val brokenPromise = CompletableDeferred<Boolean>()
val afterPromise = async {
keptPromise.await()
tracedChild("keptPromise")
}
val afterPromise2 = async {
keptPromise.await()
tracedChild("keptPromise2")
}
val failedAfterPromise = async {
brokenPromise
.runCatching { await() }
.onFailure { tracedChild("brokenPromise") }
}
launch {
tracedChild("future1")
keptPromise.complete(true)
brokenPromise.completeExceptionally(IllegalStateException())
}
listOf(afterPromise, afterPromise2, failedAfterPromise).awaitAll()
5
}
/**
* @return Number of expected spans in the trace
*/
@Trace
fun tracedWithDeferredFirstCompletions(): Int = runTest {
val children = listOf(
async {
tracedChild("timeout1")
false
},
async {
tracedChild("timeout2")
false
},
async {
tracedChild("timeout3")
true
}
)
withTimeout(TimeUnit.SECONDS.toMillis(30)) {
select<Boolean> {
children.forEach { child ->
child.onAwait { it }
}
}
}
4
}
@Trace
fun tracedChild(opName: String){
GlobalTracer.get().activeSpan().setOperationName(opName)
}
private fun <T> runTest(asyncPropagation: Boolean = true, block: suspend CoroutineScope.()->T ): T {
GlobalTracer.get().scopeManager().active().setAsyncPropagation(asyncPropagation)
return runBlocking(dispatcher,block = block)
}
private fun Scope.setAsyncPropagation(value: Boolean): Unit =
(this as TraceScope).setAsyncPropagation(value)
}

View File

@ -60,6 +60,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.Executors$DelegatedExecutorService",
"javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1",
@ -149,6 +150,9 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
transformers.put(
named("execute").and(takesArgument(0, Runnable.class)),
SetExecuteRunnableStateAdvice.class.getName());
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
named("dispatch").and(takesArgument(0, Runnable.class)),
SetExecuteRunnableStateAdvice.class.getName());
transformers.put(
named("submit").and(takesArgument(0, Runnable.class)),
SetSubmitRunnableStateAdvice.class.getName());

View File

@ -17,6 +17,8 @@ ext {
lombok : "1.18.4",
bytebuddy : "1.9.5",
scala : "2.11.12",
kotlin : "1.3.11",
coroutines : "1.1.0"
]
deps = [
@ -59,5 +61,7 @@ ext {
dependencies.create(group: 'org.slf4j', name: 'jul-to-slf4j', version: versions.slf4j),
],
scala : dependencies.create(group: 'org.scala-lang', name: 'scala-library', version: "${versions.scala}"),
kotlin : dependencies.create(group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: "${versions.kotlin}"),
coroutines : dependencies.create(group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: "${versions.coroutines}"),
]
}

View File

@ -0,0 +1,7 @@
// Enable testing kotlin code in groovy spock tests.
apply plugin: 'kotlin'
compileTestGroovy {
classpath = classpath.plus(files(compileTestKotlin.destinationDir))
dependsOn compileTestKotlin
}

View File

@ -40,6 +40,7 @@ include ':dd-java-agent:instrumentation:jax-rs-client'
include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-jersey'
include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-resteasy'
include ':dd-java-agent:instrumentation:java-concurrent'
include ':dd-java-agent:instrumentation:java-concurrent:kotlin-testing'
include ':dd-java-agent:instrumentation:java-concurrent:scala-testing'
include ':dd-java-agent:instrumentation:java-concurrent:akka-testing'
include ':dd-java-agent:instrumentation:jboss-classloading'