Add some rudimetrary tests for CompletableFuture

This commit is contained in:
Nikolay Martynov 2019-05-24 16:36:07 -04:00
parent 0b85f048d1
commit a3a325868c
1 changed files with 71 additions and 0 deletions

View File

@ -11,6 +11,7 @@ import java.lang.reflect.InvocationTargetException
import java.util.concurrent.AbstractExecutorService import java.util.concurrent.AbstractExecutorService
import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.ForkJoinPool import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask import java.util.concurrent.ForkJoinTask
@ -21,6 +22,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.function.Function
import java.util.function.Supplier
class ExecutorInstrumentationTest extends AgentTestRunner { class ExecutorInstrumentationTest extends AgentTestRunner {
@ -234,6 +237,74 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"submit Callable" | submitCallable | new ForkJoinPool() "submit Callable" | submitCallable | new ForkJoinPool()
} }
def "CompletableFuture test"() {
setup:
def pool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def differentPool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def supplier = new Supplier<String>() {
@Override
@Trace(operationName = "supplier")
String get() {
sleep(1000)
return "a"
}
}
def function = new Function<String, String>() {
@Override
@Trace(operationName = "function")
String apply(String s) {
return s + "c"
}
}
def future = new Supplier<CompletableFuture<String>>() {
@Override
@Trace(operationName = "parent")
CompletableFuture<String> get() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
}
}.get()
def result = future.get()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
expect:
result == "abc"
TEST_WRITER.size() == 1
trace.size() == 4
trace.get(0).operationName == "parent"
trace.get(1).operationName == "function"
trace.get(1).parentId == trace.get(0).spanId
trace.get(2).operationName == "appendingSupplier"
trace.get(2).parentId == trace.get(0).spanId
trace.get(3).operationName == "supplier"
trace.get(3).parentId == trace.get(0).spanId
cleanup:
pool?.shutdown()
}
class AppendingSupplier implements Supplier<String> {
String letter
AppendingSupplier(String letter) {
this.letter = letter
}
@Override
@Trace(operationName = "appendingSupplier")
String get() {
return letter + "b"
}
}
static class CustomThreadPoolExecutor extends AbstractExecutorService { static class CustomThreadPoolExecutor extends AbstractExecutorService {
volatile running = true volatile running = true
def workQueue = new LinkedBlockingQueue<Runnable>(10) def workQueue = new LinkedBlockingQueue<Runnable>(10)