Merge pull request #653 from marcoferrer/instrument-kt-coroutine-scheduler

Add support for instrumenting kotlin coroutine schedulers
This commit is contained in:
Tyler Benson 2019-01-22 10:31:32 -08:00 committed by GitHub
commit 0a89074f60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 295 additions and 2 deletions

View File

@ -1,7 +1,13 @@
// this project will run in isolation under the agent's classloader // this project will run in isolation under the agent's classloader
buildscript { buildscript {
repositories {
mavenCentral()
}
dependencies { dependencies {
classpath "net.bytebuddy:byte-buddy-gradle-plugin:${versions.bytebuddy}" classpath "net.bytebuddy:byte-buddy-gradle-plugin:${versions.bytebuddy}"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${versions.kotlin}"
} }
} }
plugins { plugins {
@ -17,7 +23,7 @@ subprojects { subProj ->
subProj.byteBuddy { subProj.byteBuddy {
transformation { transformation {
// Applying NoOp optimizes build by applying bytebuddy plugin to only compileJava task // Applying NoOp optimizes build by applying bytebuddy plugin to only compileJava task
tasks = ['compileJava', 'compileScala'] tasks = ['compileJava', 'compileScala', 'compileKotlin']
plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin$NoOp' plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin$NoOp'
} }
} }
@ -25,7 +31,7 @@ subprojects { subProj ->
subProj.afterEvaluate { subProj.afterEvaluate {
subProj.byteBuddy { subProj.byteBuddy {
transformation { transformation {
tasks = ['compileJava', 'compileScala'] tasks = ['compileJava', 'compileScala', 'compileKotlin']
plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin' plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin'
classPath = project(':dd-java-agent:agent-tooling').configurations.instrumentationMuzzle + subProj.configurations.compile + subProj.sourceSets.main.output classPath = project(':dd-java-agent:agent-tooling').configurations.instrumentationMuzzle + subProj.configurations.compile + subProj.sourceSets.main.output
} }

View File

@ -6,6 +6,7 @@ project.ext {
apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle" apply from: "${rootDir}/gradle/test-with-scala.gradle"
apply from: "${rootDir}/gradle/test-with-kotlin.gradle"
apply plugin: 'org.unbroken-dome.test-sets' apply plugin: 'org.unbroken-dome.test-sets'

View File

@ -0,0 +1,13 @@
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-kotlin.gradle"
dependencies {
testCompile project(':dd-trace-api')
testCompile project(':dd-trace-ot')
testCompile deps.kotlin
testCompile deps.coroutines
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
}

View File

@ -0,0 +1,111 @@
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ThreadPoolDispatcherKt
class KotlinCoroutineInstrumentationTest extends AgentTestRunner {
static dispatchersToTest = [
Dispatchers.Default,
Dispatchers.IO,
Dispatchers.Unconfined,
ThreadPoolDispatcherKt.newFixedThreadPoolContext(2,"Fixed-Thread-Pool"),
ThreadPoolDispatcherKt.newSingleThreadContext("Single-Thread"),
]
def "kotlin traced across channels"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)
int expectedNumberOfSpans = kotlinTest.tracedAcrossChannels()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "KotlinCoroutineTests.tracedAcrossChannels"
findSpan(trace, "produce_2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "consume_2").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
def "kotlin cancellation prevents trace"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)
int expectedNumberOfSpans = kotlinTest.tracePreventedByCancellation()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "KotlinCoroutineTests.tracePreventedByCancellation"
findSpan(trace, "preLaunch").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "postLaunch") == null
where:
dispatcher << dispatchersToTest
}
def "kotlin propagates across nested jobs"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)
int expectedNumberOfSpans = kotlinTest.tracedAcrossThreadsWithNested()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "KotlinCoroutineTests.tracedAcrossThreadsWithNested"
findSpan(trace, "nested").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
def "kotlin either deferred completion"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default)
int expectedNumberOfSpans = kotlinTest.traceWithDeferred()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
trace[0].operationName == "KotlinCoroutineTests.traceWithDeferred"
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
def "kotlin first completed deferred"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default)
int expectedNumberOfSpans = kotlinTest.tracedWithDeferredFirstCompletions()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
where:
dispatcher << dispatchersToTest
}
private static DDSpan findSpan(List<DDSpan> trace, String opName) {
for (DDSpan span : trace) {
if (span.getOperationName() == opName) {
return span
}
}
return null
}
}

View File

@ -0,0 +1,140 @@
import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import io.opentracing.Scope
import io.opentracing.util.GlobalTracer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.select
import java.util.concurrent.TimeUnit
class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
@Trace
fun tracedAcrossChannels(): Int = runTest {
val producer = produce {
repeat(3){
tracedChild("produce_$it")
send(it)
}
}
val actor = actor<Int> {
consumeEach {
tracedChild("consume_$it")
}
}
producer.toChannel(actor)
actor.close()
7
}
@Trace
fun tracePreventedByCancellation(): Int {
kotlin.runCatching {
runTest {
tracedChild("preLaunch")
launch(start = CoroutineStart.UNDISPATCHED) {
throw Exception("Child Error")
}
yield()
tracedChild("postLaunch")
}
}
return 2
}
@Trace
fun tracedAcrossThreadsWithNested(): Int = runTest {
val goodDeferred = async { 1 }
launch {
goodDeferred.await()
launch { tracedChild("nested") }
}
2
}
@Trace
fun traceWithDeferred(): Int = runTest {
val keptPromise = CompletableDeferred<Boolean>()
val brokenPromise = CompletableDeferred<Boolean>()
val afterPromise = async {
keptPromise.await()
tracedChild("keptPromise")
}
val afterPromise2 = async {
keptPromise.await()
tracedChild("keptPromise2")
}
val failedAfterPromise = async {
brokenPromise
.runCatching { await() }
.onFailure { tracedChild("brokenPromise") }
}
launch {
tracedChild("future1")
keptPromise.complete(true)
brokenPromise.completeExceptionally(IllegalStateException())
}
listOf(afterPromise, afterPromise2, failedAfterPromise).awaitAll()
5
}
/**
* @return Number of expected spans in the trace
*/
@Trace
fun tracedWithDeferredFirstCompletions(): Int = runTest {
val children = listOf(
async {
tracedChild("timeout1")
false
},
async {
tracedChild("timeout2")
false
},
async {
tracedChild("timeout3")
true
}
)
withTimeout(TimeUnit.SECONDS.toMillis(30)) {
select<Boolean> {
children.forEach { child ->
child.onAwait { it }
}
}
}
4
}
@Trace
fun tracedChild(opName: String){
GlobalTracer.get().activeSpan().setOperationName(opName)
}
private fun <T> runTest(asyncPropagation: Boolean = true, block: suspend CoroutineScope.()->T ): T {
GlobalTracer.get().scopeManager().active().setAsyncPropagation(asyncPropagation)
return runBlocking(dispatcher,block = block)
}
private fun Scope.setAsyncPropagation(value: Boolean): Unit =
(this as TraceScope).setAsyncPropagation(value)
}

View File

@ -60,6 +60,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService", "java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.Executors$DelegatedExecutorService", "java.util.concurrent.Executors$DelegatedExecutorService",
"javax.management.NotificationBroadcasterSupport$1", "javax.management.NotificationBroadcasterSupport$1",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"scala.concurrent.Future$InternalCallbackExecutor$", "scala.concurrent.Future$InternalCallbackExecutor$",
"scala.concurrent.impl.ExecutionContextImpl", "scala.concurrent.impl.ExecutionContextImpl",
"scala.concurrent.impl.ExecutionContextImpl$$anon$1", "scala.concurrent.impl.ExecutionContextImpl$$anon$1",
@ -158,6 +159,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
transformers.put( transformers.put(
nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)), nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)),
SetCallableStateForCallableCollectionAdvice.class.getName()); SetCallableStateForCallableCollectionAdvice.class.getName());
transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler
named("dispatch")
.and(takesArgument(0, Runnable.class))
.and(takesArgument(1, named("kotlinx.coroutines.scheduling.TaskContext"))),
SetExecuteRunnableStateAdvice.class.getName());
return transformers; return transformers;
} }

View File

@ -16,6 +16,8 @@ ext {
lombok : "1.18.4", lombok : "1.18.4",
bytebuddy : "1.9.5", bytebuddy : "1.9.5",
scala : "2.11.12", scala : "2.11.12",
kotlin : "1.3.11",
coroutines : "1.1.0"
] ]
deps = [ deps = [
@ -58,5 +60,7 @@ ext {
dependencies.create(group: 'org.slf4j', name: 'jul-to-slf4j', version: versions.slf4j), dependencies.create(group: 'org.slf4j', name: 'jul-to-slf4j', version: versions.slf4j),
], ],
scala : dependencies.create(group: 'org.scala-lang', name: 'scala-library', version: "${versions.scala}"), scala : dependencies.create(group: 'org.scala-lang', name: 'scala-library', version: "${versions.scala}"),
kotlin : dependencies.create(group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: "${versions.kotlin}"),
coroutines : dependencies.create(group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: "${versions.coroutines}"),
] ]
} }

View File

@ -0,0 +1,11 @@
// Enable testing kotlin code in groovy spock tests.
apply plugin: 'kotlin'
kotlin {
copyClassesToJavaOutput = true
}
compileTestGroovy {
classpath = classpath.plus(files(compileTestKotlin.destinationDir))
dependsOn compileTestKotlin
}

View File

@ -43,6 +43,7 @@ include ':dd-java-agent:instrumentation:jax-rs-client'
include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-jersey' include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-jersey'
include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-resteasy' include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-resteasy'
include ':dd-java-agent:instrumentation:java-concurrent' include ':dd-java-agent:instrumentation:java-concurrent'
include ':dd-java-agent:instrumentation:java-concurrent:kotlin-testing'
include ':dd-java-agent:instrumentation:java-concurrent:scala-testing' include ':dd-java-agent:instrumentation:java-concurrent:scala-testing'
include ':dd-java-agent:instrumentation:java-concurrent:akka-testing' include ':dd-java-agent:instrumentation:java-concurrent:akka-testing'
include ':dd-java-agent:instrumentation:jboss-classloading' include ':dd-java-agent:instrumentation:jboss-classloading'