diff --git a/dd-java-agent/instrumentation/instrumentation.gradle b/dd-java-agent/instrumentation/instrumentation.gradle index a9e4d3bcd7..0470338402 100644 --- a/dd-java-agent/instrumentation/instrumentation.gradle +++ b/dd-java-agent/instrumentation/instrumentation.gradle @@ -1,7 +1,13 @@ // this project will run in isolation under the agent's classloader buildscript { + + repositories { + mavenCentral() + } + dependencies { classpath "net.bytebuddy:byte-buddy-gradle-plugin:${versions.bytebuddy}" + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${versions.kotlin}" } } plugins { @@ -17,7 +23,7 @@ subprojects { subProj -> subProj.byteBuddy { transformation { // Applying NoOp optimizes build by applying bytebuddy plugin to only compileJava task - tasks = ['compileJava', 'compileScala'] + tasks = ['compileJava', 'compileScala', 'compileKotlin'] plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin$NoOp' } } @@ -25,7 +31,7 @@ subprojects { subProj -> subProj.afterEvaluate { subProj.byteBuddy { transformation { - tasks = ['compileJava', 'compileScala'] + tasks = ['compileJava', 'compileScala', 'compileKotlin'] plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin' classPath = project(':dd-java-agent:agent-tooling').configurations.instrumentationMuzzle + subProj.configurations.compile + subProj.sourceSets.main.output } diff --git a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle index b679a53c46..652723b713 100644 --- a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle @@ -6,6 +6,7 @@ project.ext { apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/test-with-scala.gradle" +apply from: "${rootDir}/gradle/test-with-kotlin.gradle" apply plugin: 'org.unbroken-dome.test-sets' diff --git a/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/kotlin-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/kotlin-testing.gradle new file mode 100644 index 0000000000..6492000368 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/kotlin-testing.gradle @@ -0,0 +1,13 @@ +apply from: "${rootDir}/gradle/java.gradle" +apply from: "${rootDir}/gradle/test-with-kotlin.gradle" + +dependencies { + testCompile project(':dd-trace-api') + testCompile project(':dd-trace-ot') + testCompile deps.kotlin + testCompile deps.coroutines + + testCompile project(':dd-java-agent:testing') + testCompile project(':dd-java-agent:instrumentation:java-concurrent') + testCompile project(':dd-java-agent:instrumentation:trace-annotation') +} diff --git a/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy new file mode 100644 index 0000000000..b38e6dd387 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy @@ -0,0 +1,111 @@ +import datadog.opentracing.DDSpan +import datadog.trace.agent.test.AgentTestRunner +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ThreadPoolDispatcherKt + +class KotlinCoroutineInstrumentationTest extends AgentTestRunner { + + static dispatchersToTest = [ + Dispatchers.Default, + Dispatchers.IO, + Dispatchers.Unconfined, + ThreadPoolDispatcherKt.newFixedThreadPoolContext(2,"Fixed-Thread-Pool"), + ThreadPoolDispatcherKt.newSingleThreadContext("Single-Thread"), + ] + + def "kotlin traced across channels"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) + int expectedNumberOfSpans = kotlinTest.tracedAcrossChannels() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "KotlinCoroutineTests.tracedAcrossChannels" + findSpan(trace, "produce_2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "consume_2").context().getParentId() == trace[0].context().getSpanId() + + where: + dispatcher << dispatchersToTest + } + + def "kotlin cancellation prevents trace"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) + int expectedNumberOfSpans = kotlinTest.tracePreventedByCancellation() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "KotlinCoroutineTests.tracePreventedByCancellation" + findSpan(trace, "preLaunch").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "postLaunch") == null + + where: + dispatcher << dispatchersToTest + } + + def "kotlin propagates across nested jobs"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) + int expectedNumberOfSpans = kotlinTest.tracedAcrossThreadsWithNested() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "KotlinCoroutineTests.tracedAcrossThreadsWithNested" + findSpan(trace, "nested").context().getParentId() == trace[0].context().getSpanId() + + where: + dispatcher << dispatchersToTest + } + + def "kotlin either deferred completion"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default) + int expectedNumberOfSpans = kotlinTest.traceWithDeferred() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + trace[0].operationName == "KotlinCoroutineTests.traceWithDeferred" + findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() + + where: + dispatcher << dispatchersToTest + } + + def "kotlin first completed deferred"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default) + int expectedNumberOfSpans = kotlinTest.tracedWithDeferredFirstCompletions() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() + + where: + dispatcher << dispatchersToTest + } + + private static DDSpan findSpan(List trace, String opName) { + for (DDSpan span : trace) { + if (span.getOperationName() == opName) { + return span + } + } + return null + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/src/test/kotlin/KotlinCoroutineTests.kt b/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/src/test/kotlin/KotlinCoroutineTests.kt new file mode 100644 index 0000000000..66d13d2cb7 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/kotlin-testing/src/test/kotlin/KotlinCoroutineTests.kt @@ -0,0 +1,140 @@ +import datadog.trace.api.Trace +import datadog.trace.context.TraceScope +import io.opentracing.Scope +import io.opentracing.util.GlobalTracer +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.selects.select +import java.util.concurrent.TimeUnit + +class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) { + + @Trace + fun tracedAcrossChannels(): Int = runTest { + val producer = produce { + repeat(3){ + tracedChild("produce_$it") + send(it) + } + } + + val actor = actor { + consumeEach { + tracedChild("consume_$it") + } + } + + producer.toChannel(actor) + actor.close() + + 7 + } + + @Trace + fun tracePreventedByCancellation(): Int { + + kotlin.runCatching { + runTest { + tracedChild("preLaunch") + + launch(start = CoroutineStart.UNDISPATCHED) { + throw Exception("Child Error") + } + + yield() + + tracedChild("postLaunch") + } + } + + return 2 + } + + @Trace + fun tracedAcrossThreadsWithNested(): Int = runTest { + val goodDeferred = async { 1 } + + launch { + goodDeferred.await() + launch { tracedChild("nested") } + } + + 2 + } + + @Trace + fun traceWithDeferred(): Int = runTest { + + val keptPromise = CompletableDeferred() + val brokenPromise = CompletableDeferred() + val afterPromise = async { + keptPromise.await() + tracedChild("keptPromise") + } + val afterPromise2 = async { + keptPromise.await() + tracedChild("keptPromise2") + } + val failedAfterPromise = async { + brokenPromise + .runCatching { await() } + .onFailure { tracedChild("brokenPromise") } + } + + launch { + tracedChild("future1") + keptPromise.complete(true) + brokenPromise.completeExceptionally(IllegalStateException()) + } + + listOf(afterPromise, afterPromise2, failedAfterPromise).awaitAll() + + 5 + } + + /** + * @return Number of expected spans in the trace + */ + @Trace + fun tracedWithDeferredFirstCompletions(): Int = runTest { + + val children = listOf( + async { + tracedChild("timeout1") + false + }, + async { + tracedChild("timeout2") + false + }, + async { + tracedChild("timeout3") + true + } + ) + + withTimeout(TimeUnit.SECONDS.toMillis(30)) { + select { + children.forEach { child -> + child.onAwait { it } + } + } + } + + 4 + } + + @Trace + fun tracedChild(opName: String){ + GlobalTracer.get().activeSpan().setOperationName(opName) + } + + private fun runTest(asyncPropagation: Boolean = true, block: suspend CoroutineScope.()->T ): T { + GlobalTracer.get().scopeManager().active().setAsyncPropagation(asyncPropagation) + return runBlocking(dispatcher,block = block) + } + + private fun Scope.setAsyncPropagation(value: Boolean): Unit = + (this as TraceScope).setAsyncPropagation(value) +} + diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 03c1b5c89d..c01a963622 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -60,6 +60,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { "java.util.concurrent.Executors$FinalizableDelegatedExecutorService", "java.util.concurrent.Executors$DelegatedExecutorService", "javax.management.NotificationBroadcasterSupport$1", + "kotlinx.coroutines.scheduling.CoroutineScheduler", "scala.concurrent.Future$InternalCallbackExecutor$", "scala.concurrent.impl.ExecutionContextImpl", "scala.concurrent.impl.ExecutionContextImpl$$anon$1", @@ -158,6 +159,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default { transformers.put( nameMatches("invoke(Any|All)$").and(takesArgument(0, Callable.class)), SetCallableStateForCallableCollectionAdvice.class.getName()); + transformers.put( // kotlinx.coroutines.scheduling.CoroutineScheduler + named("dispatch") + .and(takesArgument(0, Runnable.class)) + .and(takesArgument(1, named("kotlinx.coroutines.scheduling.TaskContext"))), + SetExecuteRunnableStateAdvice.class.getName()); return transformers; } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 425c3ed9a0..f85f2b6ecf 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -16,6 +16,8 @@ ext { lombok : "1.18.4", bytebuddy : "1.9.5", scala : "2.11.12", + kotlin : "1.3.11", + coroutines : "1.1.0" ] deps = [ @@ -58,5 +60,7 @@ ext { dependencies.create(group: 'org.slf4j', name: 'jul-to-slf4j', version: versions.slf4j), ], scala : dependencies.create(group: 'org.scala-lang', name: 'scala-library', version: "${versions.scala}"), + kotlin : dependencies.create(group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: "${versions.kotlin}"), + coroutines : dependencies.create(group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: "${versions.coroutines}"), ] } diff --git a/gradle/test-with-kotlin.gradle b/gradle/test-with-kotlin.gradle new file mode 100644 index 0000000000..decddb32d3 --- /dev/null +++ b/gradle/test-with-kotlin.gradle @@ -0,0 +1,11 @@ +// Enable testing kotlin code in groovy spock tests. +apply plugin: 'kotlin' + +kotlin { + copyClassesToJavaOutput = true +} + +compileTestGroovy { + classpath = classpath.plus(files(compileTestKotlin.destinationDir)) + dependsOn compileTestKotlin +} diff --git a/settings.gradle b/settings.gradle index 196c24a25b..0ab010c0e5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -43,6 +43,7 @@ include ':dd-java-agent:instrumentation:jax-rs-client' include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-jersey' include ':dd-java-agent:instrumentation:jax-rs-client:connection-error-handling-resteasy' include ':dd-java-agent:instrumentation:java-concurrent' +include ':dd-java-agent:instrumentation:java-concurrent:kotlin-testing' include ':dd-java-agent:instrumentation:java-concurrent:scala-testing' include ':dd-java-agent:instrumentation:java-concurrent:akka-testing' include ':dd-java-agent:instrumentation:jboss-classloading'