Fix flaky kotlin test (#5650)

This commit is contained in:
Lauri Tulmin 2022-03-22 01:43:01 +02:00 committed by GitHub
parent 2a24734ec3
commit 8e2a33b9a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 29 deletions

View File

@ -19,7 +19,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.asExecutor
import kotlinx.coroutines.async import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.produce import kotlinx.coroutines.channels.produce
@ -76,7 +75,7 @@ class KotlinCoroutinesInstrumentationTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(DispatchersSource::class) @ArgumentsSource(DispatchersSource::class)
fun `traced across channels`(dispatcher: CoroutineDispatcher) { fun `traced across channels`(dispatcher: DispatcherWrapper) {
runTest(dispatcher) { runTest(dispatcher) {
val producer = produce { val producer = produce {
repeat(3) { repeat(3) {
@ -136,7 +135,7 @@ class KotlinCoroutinesInstrumentationTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(DispatchersSource::class) @ArgumentsSource(DispatchersSource::class)
fun `cancellation prevents trace`(dispatcher: CoroutineDispatcher) { fun `cancellation prevents trace`(dispatcher: DispatcherWrapper) {
runCatching { runCatching {
runTest(dispatcher) { runTest(dispatcher) {
tracedChild("preLaunch") tracedChild("preLaunch")
@ -169,7 +168,7 @@ class KotlinCoroutinesInstrumentationTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(DispatchersSource::class) @ArgumentsSource(DispatchersSource::class)
fun `propagates across nested jobs`(dispatcher: CoroutineDispatcher) { fun `propagates across nested jobs`(dispatcher: DispatcherWrapper) {
runTest(dispatcher) { runTest(dispatcher) {
val goodDeferred = async { 1 } val goodDeferred = async { 1 }
@ -280,21 +279,26 @@ class KotlinCoroutinesInstrumentationTest {
testing.waitAndAssertTraces( testing.waitAndAssertTraces(
{ trace -> { trace ->
trace.hasSpansSatisfyingExactly( // TODO(anuraaga): Need hasSpansSatisfyingExactlyInAnyOrder sometimes
{ trace.satisfiesExactlyInAnyOrder(
it.hasName("parent") Consumer {
assertThat(it)
.hasName("parent")
.hasNoParent() .hasNoParent()
}, },
{ Consumer {
it.hasName("timeout1") assertThat(it)
.hasName("timeout1")
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
}, },
{ Consumer {
it.hasName("timeout2") assertThat(it)
.hasName("timeout2")
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
}, },
{ Consumer {
it.hasName("timeout3") assertThat(it)
.hasName("timeout3")
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
}, },
) )
@ -361,9 +365,9 @@ class KotlinCoroutinesInstrumentationTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(DispatchersSource::class) @ArgumentsSource(DispatchersSource::class)
fun `traced mono`(dispatcher: CoroutineDispatcher) { fun `traced mono`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcher) { runTest(dispatcherWrapper) {
mono(dispatcher) { mono(dispatcherWrapper.dispatcher) {
tracedChild("child") tracedChild("child")
}.awaitSingle() }.awaitSingle()
} }
@ -386,12 +390,12 @@ class KotlinCoroutinesInstrumentationTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(DispatchersSource::class) @ArgumentsSource(DispatchersSource::class)
fun `traced mono with context propagation operator`(dispatcher: CoroutineDispatcher) { fun `traced mono with context propagation operator`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcher) { runTest(dispatcherWrapper) {
val currentContext = Context.current() val currentContext = Context.current()
// clear current context to ensure that ContextPropagationOperator is used for context propagation // clear current context to ensure that ContextPropagationOperator is used for context propagation
withContext(Context.root().asContextElement()) { withContext(Context.root().asContextElement()) {
val mono = mono(dispatcher) { val mono = mono(dispatcherWrapper.dispatcher) {
// extract context from reactor and propagate it into coroutine // extract context from reactor and propagate it into coroutine
val reactorContext = coroutineContext[ReactorContext.Key]?.context val reactorContext = coroutineContext[ReactorContext.Key]?.context
val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current()) val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current())
@ -421,9 +425,9 @@ class KotlinCoroutinesInstrumentationTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(DispatchersSource::class) @ArgumentsSource(DispatchersSource::class)
fun `traced flux`(dispatcher: CoroutineDispatcher) { fun `traced flux`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcher) { runTest(dispatcherWrapper) {
flux(dispatcher) { flux(dispatcherWrapper.dispatcher) {
repeat(3) { repeat(3) {
tracedChild("child_$it") tracedChild("child_$it")
send(it) send(it)
@ -460,6 +464,10 @@ class KotlinCoroutinesInstrumentationTest {
tracer.spanBuilder(opName).startSpan().end() tracer.spanBuilder(opName).startSpan().end()
} }
private fun <T> runTest(dispatcherWrapper: DispatcherWrapper, block: suspend CoroutineScope.() -> T): T {
return runTest(dispatcherWrapper.dispatcher, block)
}
private fun <T> runTest(dispatcher: CoroutineDispatcher, block: suspend CoroutineScope.() -> T): T { private fun <T> runTest(dispatcher: CoroutineDispatcher, block: suspend CoroutineScope.() -> T): T {
val parentSpan = tracer.spanBuilder("parent").startSpan() val parentSpan = tracer.spanBuilder("parent").startSpan()
val parentScope = parentSpan.makeCurrent() val parentScope = parentSpan.makeCurrent()
@ -512,13 +520,17 @@ class KotlinCoroutinesInstrumentationTest {
class DispatchersSource : ArgumentsProvider { class DispatchersSource : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext?): Stream<out Arguments> = override fun provideArguments(context: ExtensionContext?): Stream<out Arguments> =
Stream.of( Stream.of(
// Round-trip through Executor for global dispatchers since it seems ParameterizedTest tries to automatically // Wrap dispatchers since it seems that ParameterizedTest tries to automatically close
// close Closeable arguments with no way to avoid it. // Closeable arguments with no way to avoid it.
arguments(Dispatchers.Default.asExecutor().asCoroutineDispatcher()), arguments(DispatcherWrapper(Dispatchers.Default)),
arguments(Dispatchers.IO.asExecutor().asCoroutineDispatcher()), arguments(DispatcherWrapper(Dispatchers.IO)),
arguments(Dispatchers.Unconfined.asExecutor().asCoroutineDispatcher()), arguments(DispatcherWrapper(Dispatchers.Unconfined)),
arguments(threadPool.asCoroutineDispatcher()), arguments(DispatcherWrapper(threadPool.asCoroutineDispatcher())),
arguments(singleThread.asCoroutineDispatcher()), arguments(DispatcherWrapper(singleThread.asCoroutineDispatcher())),
) )
} }
class DispatcherWrapper(val dispatcher: CoroutineDispatcher) {
override fun toString(): String = dispatcher.toString()
}
} }