Separate executor and scala future tests
This commit is contained in:
parent
3f26662b8f
commit
6e62b79b8f
|
@ -7,4 +7,7 @@ dependencies {
|
|||
compile deps.bytebuddy
|
||||
compile deps.opentracing
|
||||
compile deps.autoservice
|
||||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
apply plugin: 'scala'
|
||||
apply from: "${rootDir}/gradle/test-with-scala.gradle"
|
||||
|
||||
dependencies {
|
||||
compile project(':dd-trace-api')
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
import datadog.opentracing.DDSpan
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
|
||||
class ScalaInstrumentationTest extends AgentTestRunner {
|
||||
static {
|
||||
System.setProperty("dd.integration.java_concurrent.enabled", "true")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterTest() {
|
||||
// Ignore failures to instrument sun proxy classes
|
||||
}
|
||||
|
||||
def "scala futures and callbacks"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
|
||||
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala propagates across futures with no traces"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
|
||||
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala either promise completion"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithPromises()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
|
||||
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala first completed future"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
private DDSpan findSpan(List<DDSpan> trace, String opName) {
|
||||
for (DDSpan span : trace) {
|
||||
if (span.getOperationName() == opName) {
|
||||
return span
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
import datadog.trace.api.Trace
|
||||
import io.opentracing.util.GlobalTracer
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, Future, Promise}
|
||||
|
||||
class ScalaConcurrentTests {
|
|
@ -118,73 +118,4 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | _
|
||||
new ScheduledThreadPoolExecutor(1) | _
|
||||
}
|
||||
|
||||
def "scala futures and callbacks"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
|
||||
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala propagates across futures with no traces"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
|
||||
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala either promise completion"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithPromises()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
|
||||
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala first completed future"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
private DDSpan findSpan(List<DDSpan> trace, String opName) {
|
||||
for (DDSpan span : trace) {
|
||||
if (span.getOperationName() == opName) {
|
||||
return span
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
import datadog.trace.api.Trace;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class AsyncChild implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
private final AtomicInteger numberOfWorkers = new AtomicInteger(0);
|
||||
|
||||
public AsyncChild() {
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
public AsyncChild(boolean doTraceableWork, boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
}
|
||||
|
||||
public void unblock() {
|
||||
blockThread.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
runImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
runImpl();
|
||||
return null;
|
||||
}
|
||||
|
||||
private void runImpl() {
|
||||
if (doTraceableWork) {
|
||||
asyncChild();
|
||||
}
|
||||
numberOfWorkers.getAndIncrement();
|
||||
try {
|
||||
while (blockThread.get()) {
|
||||
// busy-wait to block thread
|
||||
}
|
||||
} finally {
|
||||
numberOfWorkers.getAndDecrement();
|
||||
}
|
||||
}
|
||||
|
||||
@Trace(operationName = "asyncChild")
|
||||
private void asyncChild() {}
|
||||
}
|
Loading…
Reference in New Issue