Fix kotlin coroutine context propagation (#7879)
Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7837 `org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1` adds a second `newCoroutineContext` that we shouldn't instrument. When we instrument it the order of [`KotlinContextElement`](https://github.com/open-telemetry/opentelemetry-java/blob/main/extensions/kotlin/src/main/java/io/opentelemetry/extension/kotlin/KotlinContextElement.java) and user added `ThreadContextElement` gets reversed. If user added `ThreadContextElement` changes opentelemetry context then these changes will get overwritten by `KotlinContextElement`.
This commit is contained in:
parent
fe263edecf
commit
cbc616cbff
|
@ -25,6 +25,7 @@ public class KotlinCoroutinesInstrumentation implements TypeInstrumentation {
|
|||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("newCoroutineContext")
|
||||
.and(takesArgument(0, named("kotlinx.coroutines.CoroutineScope")))
|
||||
.and(takesArgument(1, named("kotlin.coroutines.CoroutineContext"))),
|
||||
this.getClass().getName() + "$ContextAdvice");
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines
|
|||
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.context.ContextKey
|
||||
import io.opentelemetry.context.Scope
|
||||
import io.opentelemetry.extension.kotlin.asContextElement
|
||||
import io.opentelemetry.extension.kotlin.getOpenTelemetryContext
|
||||
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator
|
||||
|
@ -20,6 +21,7 @@ import kotlinx.coroutines.CoroutineStart
|
|||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.ThreadContextElement
|
||||
import kotlinx.coroutines.asCoroutineDispatcher
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.awaitAll
|
||||
|
@ -54,6 +56,7 @@ import java.util.concurrent.Executors
|
|||
import java.util.concurrent.TimeUnit
|
||||
import java.util.function.Consumer
|
||||
import java.util.stream.Stream
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
@ExperimentalCoroutinesApi
|
||||
|
@ -563,4 +566,40 @@ class KotlinCoroutinesInstrumentationTest {
|
|||
class DispatcherWrapper(val dispatcher: CoroutineDispatcher) {
|
||||
override fun toString(): String = dispatcher.toString()
|
||||
}
|
||||
|
||||
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7837
|
||||
// tests that a custom ThreadContextElement runs after KotlinContextElement that is used for
|
||||
// context propagation in coroutines
|
||||
@Test
|
||||
fun `test custom context element`() {
|
||||
val testValue = "test-value"
|
||||
val contextKey = ContextKey.named<String>("test-key")
|
||||
val scope = Context.current().with(contextKey, "wrong value").makeCurrent()
|
||||
scope.use {
|
||||
runBlocking {
|
||||
val context = Context.current().with(contextKey, testValue)
|
||||
withContext(TestContextElement(context)) {
|
||||
delay(10)
|
||||
val result = Context.current().get(contextKey)
|
||||
assertThat(result).isEqualTo(testValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestContextElement(private val otelContext: Context) : ThreadContextElement<Scope> {
|
||||
companion object Key : CoroutineContext.Key<TestContextElement> {
|
||||
}
|
||||
|
||||
override val key: CoroutineContext.Key<TestContextElement>
|
||||
get() = Key
|
||||
|
||||
override fun restoreThreadContext(context: CoroutineContext, oldState: Scope) {
|
||||
oldState.close()
|
||||
}
|
||||
|
||||
override fun updateThreadContext(context: CoroutineContext): Scope {
|
||||
return otelContext.makeCurrent()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue