diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt index 43be9b9dc8..d8e6114a1b 100644 --- a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt +++ b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt @@ -19,7 +19,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.asExecutor import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.channels.produce @@ -76,7 +75,7 @@ class KotlinCoroutinesInstrumentationTest { @ParameterizedTest @ArgumentsSource(DispatchersSource::class) - fun `traced across channels`(dispatcher: CoroutineDispatcher) { + fun `traced across channels`(dispatcher: DispatcherWrapper) { runTest(dispatcher) { val producer = produce { repeat(3) { @@ -136,7 +135,7 @@ class KotlinCoroutinesInstrumentationTest { @ParameterizedTest @ArgumentsSource(DispatchersSource::class) - fun `cancellation prevents trace`(dispatcher: CoroutineDispatcher) { + fun `cancellation prevents trace`(dispatcher: DispatcherWrapper) { runCatching { runTest(dispatcher) { tracedChild("preLaunch") @@ -169,7 +168,7 @@ class KotlinCoroutinesInstrumentationTest { @ParameterizedTest @ArgumentsSource(DispatchersSource::class) - fun `propagates across nested jobs`(dispatcher: CoroutineDispatcher) { + fun `propagates across nested jobs`(dispatcher: DispatcherWrapper) { runTest(dispatcher) { val goodDeferred = async { 1 } @@ -280,21 +279,26 @@ class KotlinCoroutinesInstrumentationTest { testing.waitAndAssertTraces( { trace -> - trace.hasSpansSatisfyingExactly( - { - it.hasName("parent") + // TODO(anuraaga): Need hasSpansSatisfyingExactlyInAnyOrder sometimes + trace.satisfiesExactlyInAnyOrder( + Consumer { + assertThat(it) + .hasName("parent") .hasNoParent() }, - { - it.hasName("timeout1") + Consumer { + assertThat(it) + .hasName("timeout1") .hasParent(trace.getSpan(0)) }, - { - it.hasName("timeout2") + Consumer { + assertThat(it) + .hasName("timeout2") .hasParent(trace.getSpan(0)) }, - { - it.hasName("timeout3") + Consumer { + assertThat(it) + .hasName("timeout3") .hasParent(trace.getSpan(0)) }, ) @@ -361,9 +365,9 @@ class KotlinCoroutinesInstrumentationTest { @ParameterizedTest @ArgumentsSource(DispatchersSource::class) - fun `traced mono`(dispatcher: CoroutineDispatcher) { - runTest(dispatcher) { - mono(dispatcher) { + fun `traced mono`(dispatcherWrapper: DispatcherWrapper) { + runTest(dispatcherWrapper) { + mono(dispatcherWrapper.dispatcher) { tracedChild("child") }.awaitSingle() } @@ -386,12 +390,12 @@ class KotlinCoroutinesInstrumentationTest { @ParameterizedTest @ArgumentsSource(DispatchersSource::class) - fun `traced mono with context propagation operator`(dispatcher: CoroutineDispatcher) { - runTest(dispatcher) { + fun `traced mono with context propagation operator`(dispatcherWrapper: DispatcherWrapper) { + runTest(dispatcherWrapper) { val currentContext = Context.current() // clear current context to ensure that ContextPropagationOperator is used for context propagation withContext(Context.root().asContextElement()) { - val mono = mono(dispatcher) { + val mono = mono(dispatcherWrapper.dispatcher) { // extract context from reactor and propagate it into coroutine val reactorContext = coroutineContext[ReactorContext.Key]?.context val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current()) @@ -421,9 +425,9 @@ class KotlinCoroutinesInstrumentationTest { @ParameterizedTest @ArgumentsSource(DispatchersSource::class) - fun `traced flux`(dispatcher: CoroutineDispatcher) { - runTest(dispatcher) { - flux(dispatcher) { + fun `traced flux`(dispatcherWrapper: DispatcherWrapper) { + runTest(dispatcherWrapper) { + flux(dispatcherWrapper.dispatcher) { repeat(3) { tracedChild("child_$it") send(it) @@ -460,6 +464,10 @@ class KotlinCoroutinesInstrumentationTest { tracer.spanBuilder(opName).startSpan().end() } + private fun runTest(dispatcherWrapper: DispatcherWrapper, block: suspend CoroutineScope.() -> T): T { + return runTest(dispatcherWrapper.dispatcher, block) + } + private fun runTest(dispatcher: CoroutineDispatcher, block: suspend CoroutineScope.() -> T): T { val parentSpan = tracer.spanBuilder("parent").startSpan() val parentScope = parentSpan.makeCurrent() @@ -512,13 +520,17 @@ class KotlinCoroutinesInstrumentationTest { class DispatchersSource : ArgumentsProvider { override fun provideArguments(context: ExtensionContext?): Stream = Stream.of( - // Round-trip through Executor for global dispatchers since it seems ParameterizedTest tries to automatically - // close Closeable arguments with no way to avoid it. - arguments(Dispatchers.Default.asExecutor().asCoroutineDispatcher()), - arguments(Dispatchers.IO.asExecutor().asCoroutineDispatcher()), - arguments(Dispatchers.Unconfined.asExecutor().asCoroutineDispatcher()), - arguments(threadPool.asCoroutineDispatcher()), - arguments(singleThread.asCoroutineDispatcher()), + // Wrap dispatchers since it seems that ParameterizedTest tries to automatically close + // Closeable arguments with no way to avoid it. + arguments(DispatcherWrapper(Dispatchers.Default)), + arguments(DispatcherWrapper(Dispatchers.IO)), + arguments(DispatcherWrapper(Dispatchers.Unconfined)), + arguments(DispatcherWrapper(threadPool.asCoroutineDispatcher())), + arguments(DispatcherWrapper(singleThread.asCoroutineDispatcher())), ) } + + class DispatcherWrapper(val dispatcher: CoroutineDispatcher) { + override fun toString(): String = dispatcher.toString() + } }