diff --git a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle index cc3bf64f64..7951e9254f 100644 --- a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle @@ -7,4 +7,7 @@ dependencies { compile deps.bytebuddy compile deps.opentracing compile deps.autoservice + + testCompile project(':dd-java-agent:testing') + testCompile project(':dd-java-agent:instrumentation:trace-annotation') } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle index 6e25720806..1bc5e57177 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle @@ -1,5 +1,5 @@ apply from: "${rootDir}/gradle/java.gradle" -apply plugin: 'scala' +apply from: "${rootDir}/gradle/test-with-scala.gradle" dependencies { compile project(':dd-trace-api') diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy new file mode 100644 index 0000000000..527daf4d96 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy @@ -0,0 +1,82 @@ +import datadog.opentracing.DDSpan +import datadog.trace.agent.test.AgentTestRunner + +class ScalaInstrumentationTest extends AgentTestRunner { + static { + System.setProperty("dd.integration.java_concurrent.enabled", "true") + } + + @Override + void afterTest() { + // Ignore failures to instrument sun proxy classes + } + + def "scala futures and callbacks"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks" + findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala propagates across futures with no traces"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace" + findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala either promise completion"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.traceWithPromises() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.traceWithPromises" + findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala first completed future"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() + } + + private DDSpan findSpan(List trace, String opName) { + for (DDSpan span : trace) { + if (span.getOperationName() == opName) { + return span + } + } + return null + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala similarity index 100% rename from dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala rename to dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala index c73c886d0e..fd0a71851a 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala @@ -1,8 +1,8 @@ import datadog.trace.api.Trace import io.opentracing.util.GlobalTracer -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} class ScalaConcurrentTests { diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy similarity index 59% rename from dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy rename to dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 19dd7b94ba..5884d02b24 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -118,73 +118,4 @@ class ExecutorInstrumentationTest extends AgentTestRunner { new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | _ new ScheduledThreadPoolExecutor(1) | _ } - - def "scala futures and callbacks"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks" - findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala propagates across futures with no traces"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace" - findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala either promise completion"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.traceWithPromises() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - TEST_WRITER.size() == 1 - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.traceWithPromises" - findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala first completed future"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - TEST_WRITER.size() == 1 - trace.size() == expectedNumberOfSpans - findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() - } - - private DDSpan findSpan(List trace, String opName) { - for (DDSpan span : trace) { - if (span.getOperationName() == opName) { - return span - } - } - return null - } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java b/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java new file mode 100644 index 0000000000..8d8746be2d --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java @@ -0,0 +1,51 @@ +import datadog.trace.api.Trace; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncChild implements Runnable, Callable { + private final AtomicBoolean blockThread; + private final boolean doTraceableWork; + private final AtomicInteger numberOfWorkers = new AtomicInteger(0); + + public AsyncChild() { + this(true, false); + } + + public AsyncChild(boolean doTraceableWork, boolean blockThread) { + this.doTraceableWork = doTraceableWork; + this.blockThread = new AtomicBoolean(blockThread); + } + + public void unblock() { + blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() throws Exception { + runImpl(); + return null; + } + + private void runImpl() { + if (doTraceableWork) { + asyncChild(); + } + numberOfWorkers.getAndIncrement(); + try { + while (blockThread.get()) { + // busy-wait to block thread + } + } finally { + numberOfWorkers.getAndDecrement(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} +}