Copied scala tests over from integration tests
This commit is contained in:
parent
4b852d7ac1
commit
5a97c1e5bb
|
@ -0,0 +1,16 @@
|
|||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
apply plugin: 'scala'
|
||||
|
||||
dependencies {
|
||||
// calling scala classes in spock requires an explicit dependency,
|
||||
// hence the compile instead of testCompile
|
||||
compileOnly group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
|
||||
testCompile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
|
||||
|
||||
compile project(':dd-trace-api')
|
||||
compile project(':dd-trace-ot')
|
||||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
|
||||
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
|
||||
}
|
|
@ -0,0 +1,129 @@
|
|||
import datadog.trace.api.Trace
|
||||
import io.opentracing.util.GlobalTracer
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.{Await, Future, Promise}
|
||||
|
||||
class ScalaConcurrentTests {
|
||||
|
||||
/**
|
||||
* @return Number of expected spans in the trace
|
||||
*/
|
||||
@Trace
|
||||
def traceWithFutureAndCallbacks() : Integer = {
|
||||
val goodFuture: Future[Integer] = Future {
|
||||
tracedChild("goodFuture")
|
||||
1
|
||||
}
|
||||
goodFuture onSuccess {
|
||||
case _ => tracedChild("successCallback")
|
||||
}
|
||||
val badFuture: Future[Integer] = Future {
|
||||
tracedChild("badFuture")
|
||||
throw new RuntimeException("Uh-oh")
|
||||
}
|
||||
badFuture onFailure {
|
||||
case t: Throwable => tracedChild("failureCallback")
|
||||
}
|
||||
|
||||
return 5
|
||||
}
|
||||
|
||||
@Trace
|
||||
def tracedAcrossThreadsWithNoTrace() :Integer = {
|
||||
val goodFuture: Future[Integer] = Future {
|
||||
1
|
||||
}
|
||||
goodFuture onSuccess {
|
||||
case _ => Future {
|
||||
2
|
||||
} onSuccess {
|
||||
case _ => tracedChild("callback")
|
||||
}
|
||||
}
|
||||
|
||||
return 2
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of expected spans in the trace
|
||||
*/
|
||||
@Trace
|
||||
def traceWithPromises() : Integer = {
|
||||
val keptPromise = Promise[Boolean]()
|
||||
val brokenPromise = Promise[Boolean]()
|
||||
val afterPromise = keptPromise.future
|
||||
val afterPromise2 = keptPromise.future
|
||||
|
||||
val failedAfterPromise = brokenPromise.future
|
||||
|
||||
Future {
|
||||
tracedChild("future1")
|
||||
keptPromise success true
|
||||
brokenPromise failure new IllegalStateException()
|
||||
}
|
||||
|
||||
afterPromise onSuccess {
|
||||
case b => tracedChild("keptPromise")
|
||||
}
|
||||
afterPromise2 onSuccess {
|
||||
case b => tracedChild("keptPromise2")
|
||||
}
|
||||
|
||||
failedAfterPromise onFailure {
|
||||
case t => tracedChild("brokenPromise")
|
||||
}
|
||||
|
||||
return 5
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of expected spans in the trace
|
||||
*/
|
||||
@Trace
|
||||
def tracedWithFutureFirstCompletions() :Integer = {
|
||||
val completedVal = Future.firstCompletedOf(
|
||||
List(
|
||||
Future {
|
||||
tracedChild("timeout1")
|
||||
false
|
||||
},
|
||||
Future {
|
||||
tracedChild("timeout2")
|
||||
false
|
||||
},
|
||||
Future {
|
||||
tracedChild("timeout3")
|
||||
true
|
||||
}))
|
||||
Await.result(completedVal, 30 seconds)
|
||||
return 4
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of expected spans in the trace
|
||||
*/
|
||||
@Trace
|
||||
def tracedTimeout(): Integer = {
|
||||
val f: Future[String] = Future {
|
||||
tracedChild("timeoutChild")
|
||||
while(true) {
|
||||
// never actually finish
|
||||
}
|
||||
"done"
|
||||
}
|
||||
|
||||
try {
|
||||
Await.result(f, 1 milliseconds)
|
||||
} catch {
|
||||
case e: Exception => {}
|
||||
}
|
||||
return 2
|
||||
}
|
||||
|
||||
@Trace
|
||||
def tracedChild(opName: String): Unit = {
|
||||
GlobalTracer.get().activeSpan().setOperationName(opName)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
import datadog.opentracing.DDSpan
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.Trace
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Unroll
|
||||
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ForkJoinPool
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class ExecutorInstrumentationTest extends AgentTestRunner {
|
||||
@Shared
|
||||
Method submitMethod
|
||||
@Shared
|
||||
Method executeMethod
|
||||
|
||||
static {
|
||||
System.setProperty("dd.integration.java_concurrent.enabled", "true")
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
executeMethod = Executor.getMethod("execute", Runnable)
|
||||
submitMethod = ExecutorService.getMethod("submit", Callable)
|
||||
}
|
||||
|
||||
@Unroll
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl #method propagates"() {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
def m = method
|
||||
|
||||
new Runnable(){
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
// this child will have a span
|
||||
m.invoke(pool, new AsyncChild())
|
||||
// this child won't
|
||||
m.invoke(pool, new AsyncChild(false, false))
|
||||
}
|
||||
}.run()
|
||||
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == 2
|
||||
trace.get(0).operationName == "parent"
|
||||
trace.get(1).operationName == "asyncChild"
|
||||
trace.get(1).parentId == trace.get(0).spanId
|
||||
|
||||
cleanup:
|
||||
pool?.shutdown()
|
||||
|
||||
// Unfortunately, there's no simple way to test the cross product of methods/pools.
|
||||
where:
|
||||
poolImpl | method
|
||||
new ForkJoinPool() | submitMethod
|
||||
new ForkJoinPool() | executeMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | submitMethod
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | executeMethod
|
||||
new ScheduledThreadPoolExecutor(1) | submitMethod
|
||||
new ScheduledThreadPoolExecutor(1) | executeMethod
|
||||
}
|
||||
|
||||
@Unroll
|
||||
// more useful name breaks java9 javac
|
||||
// def "#poolImpl.getClass().getSimpleName() #method.getName() propagates"()
|
||||
def "#poolImpl reports after canceled jobs" () {
|
||||
setup:
|
||||
def pool = poolImpl
|
||||
final AsyncChild child = new AsyncChild(true, true)
|
||||
List<Future> jobFutures = new ArrayList<Future>()
|
||||
|
||||
new Runnable(){
|
||||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
try {
|
||||
for (int i = 0; i < 20; ++ i) {
|
||||
Future f = pool.submit((Callable)child)
|
||||
jobFutures.add(f)
|
||||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
}
|
||||
|
||||
for (Future f : jobFutures) {
|
||||
f.cancel(false)
|
||||
}
|
||||
child.unblock()
|
||||
}
|
||||
}.run()
|
||||
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
|
||||
where:
|
||||
poolImpl | _
|
||||
new ForkJoinPool() | _
|
||||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | _
|
||||
new ScheduledThreadPoolExecutor(1) | _
|
||||
}
|
||||
|
||||
def "scala futures and callbacks"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
|
||||
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala propagates across futures with no traces"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
|
||||
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala either promise completion"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithPromises()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
|
||||
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala first completed future"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
private DDSpan findSpan(List<DDSpan> trace, String opName) {
|
||||
for (DDSpan span : trace) {
|
||||
if (span.getOperationName() == opName) {
|
||||
return span
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
import datadog.trace.api.Trace;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class AsyncChild implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
private final AtomicInteger numberOfWorkers = new AtomicInteger(0);
|
||||
|
||||
public AsyncChild() {
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
public AsyncChild(boolean doTraceableWork, boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
}
|
||||
|
||||
public void unblock() {
|
||||
blockThread.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
runImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
runImpl();
|
||||
return null;
|
||||
}
|
||||
|
||||
private void runImpl() {
|
||||
if (doTraceableWork) {
|
||||
asyncChild();
|
||||
}
|
||||
numberOfWorkers.getAndIncrement();
|
||||
try {
|
||||
while (blockThread.get()) {
|
||||
// busy-wait to block thread
|
||||
}
|
||||
} finally {
|
||||
numberOfWorkers.getAndDecrement();
|
||||
}
|
||||
}
|
||||
|
||||
@Trace(operationName = "asyncChild")
|
||||
private void asyncChild() {}
|
||||
}
|
|
@ -16,6 +16,7 @@ include ':dd-java-agent:instrumentation:classloaders'
|
|||
include ':dd-java-agent:instrumentation:datastax-cassandra-3.2'
|
||||
include ':dd-java-agent:instrumentation:jax-rs'
|
||||
include ':dd-java-agent:instrumentation:java-concurrent'
|
||||
include ':dd-java-agent:instrumentation:java-concurrent:scala-testing'
|
||||
include ':dd-java-agent:instrumentation:jboss-classloading'
|
||||
include ':dd-java-agent:instrumentation:jdbc'
|
||||
include ':dd-java-agent:instrumentation:jedis-1.4'
|
||||
|
|
Loading…
Reference in New Issue