Ktor client instrumentation (#7982)

Client implementation for Ktor 2.0.
Resolves
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4972.

- Moved server instrumentation under `server` package
- Implemented a plugin for ktor `HttpClient`
This commit is contained in:
Alaksiej Ščarbaty 2023-03-07 13:50:53 +01:00 committed by GitHub
parent 9a9a42b837
commit 3b4aeebd6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 424 additions and 11 deletions

View File

@ -1,6 +1,6 @@
# Library Instrumentation for Ktor version 2.0 and higher
This package contains libraries to help instrument Ktor. Currently, only server instrumentation is supported.
This package contains libraries to help instrument Ktor. Server and client instrumentations are supported.
## Quickstart
@ -35,7 +35,7 @@ Initialize instrumentation by installing the `KtorServerTracing` feature. You mu
the feature.
```kotlin
OpenTelemetry openTelemetry = initializeOpenTelemetryForMe()
val openTelemetry: OpenTelemetry = initializeOpenTelemetryForMe()
embeddedServer(Netty, 8080) {
install(KtorServerTracing) {
@ -43,3 +43,18 @@ embeddedServer(Netty, 8080) {
}
}
```
## Initializing client instrumentation
Initialize instrumentation by installing the `KtorClientTracing` feature. You must set the `OpenTelemetry` to use with
the feature.
```kotlin
val openTelemetry: OpenTelemetry = initializeOpenTelemetryForMe()
val client = HttpClient {
install(KtorClientTracing) {
setOpenTelemetry(openTelemetry)
}
}
```

View File

@ -6,8 +6,11 @@ plugins {
id("org.jetbrains.kotlin.jvm")
}
val ktorVersion = "2.0.0"
dependencies {
library("io.ktor:ktor-server-core:2.0.0")
library("io.ktor:ktor-client-core:$ktorVersion")
library("io.ktor:ktor-server-core:$ktorVersion")
implementation(project(":instrumentation:ktor:ktor-common:library"))
implementation("io.opentelemetry:opentelemetry-extension-kotlin")
@ -16,7 +19,8 @@ dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testLibrary("io.ktor:ktor-server-netty:2.0.0")
testLibrary("io.ktor:ktor-server-netty:$ktorVersion")
testLibrary("io.ktor:ktor-client-cio:$ktorVersion")
}
tasks {

View File

@ -0,0 +1,14 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
/**
* Common properties for both client and server instrumentations
*/
internal object InstrumentationProperties {
internal const val INSTRUMENTATION_NAME = "io.opentelemetry.ktor-2.0"
}

View File

@ -0,0 +1,107 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.*
import io.ktor.client.call.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.util.*
import io.ktor.util.pipeline.*
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import kotlinx.coroutines.withContext
class KtorClientTracing internal constructor(
private val instrumenter: Instrumenter<HttpRequestData, HttpResponse>,
private val propagators: ContextPropagators,
) {
private fun createSpan(requestBuilder: HttpRequestBuilder): Context? {
val parentContext = Context.current()
val requestData = requestBuilder.build()
return if (instrumenter.shouldStart(parentContext, requestData)) {
instrumenter.start(parentContext, requestData)
} else {
null
}
}
private fun populateRequestHeaders(requestBuilder: HttpRequestBuilder, context: Context) {
propagators.textMapPropagator.inject(context, requestBuilder, KtorHttpHeadersSetter)
}
private fun endSpan(context: Context, call: HttpClientCall, error: Throwable?) {
endSpan(context, HttpRequestBuilder().takeFrom(call.request), call.response, error)
}
private fun endSpan(context: Context, requestBuilder: HttpRequestBuilder, response: HttpResponse?, error: Throwable?) {
instrumenter.end(context, requestBuilder.build(), response, error)
}
companion object : HttpClientPlugin<KtorClientTracingBuilder, KtorClientTracing> {
private val openTelemetryContextKey = AttributeKey<Context>("OpenTelemetry")
override val key = AttributeKey<KtorClientTracing>("OpenTelemetry")
override fun prepare(block: KtorClientTracingBuilder.() -> Unit) = KtorClientTracingBuilder().apply(block).build()
override fun install(plugin: KtorClientTracing, scope: HttpClient) {
installSpanCreation(plugin, scope)
installSpanEnd(plugin, scope)
}
private fun installSpanCreation(plugin: KtorClientTracing, scope: HttpClient) {
val createSpanPhase = PipelinePhase("OpenTelemetryCreateSpan")
scope.sendPipeline.insertPhaseAfter(HttpSendPipeline.State, createSpanPhase)
scope.sendPipeline.intercept(createSpanPhase) {
val requestBuilder = context
val openTelemetryContext = plugin.createSpan(requestBuilder)
if (openTelemetryContext != null) {
try {
requestBuilder.attributes.put(openTelemetryContextKey, openTelemetryContext)
plugin.populateRequestHeaders(requestBuilder, openTelemetryContext)
withContext(openTelemetryContext.asContextElement()) { proceed() }
} catch (e: Throwable) {
plugin.endSpan(openTelemetryContext, requestBuilder, null, e)
throw e
}
} else {
proceed()
}
}
}
private fun installSpanEnd(plugin: KtorClientTracing, scope: HttpClient) {
val endSpanPhase = PipelinePhase("OpenTelemetryEndSpan")
scope.receivePipeline.insertPhaseBefore(HttpReceivePipeline.State, endSpanPhase)
scope.receivePipeline.intercept(endSpanPhase) {
val openTelemetryContext = it.call.attributes.getOrNull(openTelemetryContextKey)
if (openTelemetryContext != null) {
try {
withContext(openTelemetryContext.asContextElement()) { proceed() }
plugin.endSpan(openTelemetryContext, it.call, null)
} catch (e: Throwable) {
plugin.endSpan(openTelemetryContext, it.call, e)
throw e
}
} else {
proceed()
}
}
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.request.*
import io.ktor.client.statement.HttpResponse
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor.alwaysClient
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor
import io.opentelemetry.instrumentation.ktor.v2_0.InstrumentationProperties.INSTRUMENTATION_NAME
class KtorClientTracingBuilder {
private var openTelemetry: OpenTelemetry? = null
private val additionalExtractors = mutableListOf<AttributesExtractor<in HttpRequestData, in HttpResponse>>()
private val httpAttributesExtractorBuilder = HttpClientAttributesExtractor.builder(
KtorHttpClientAttributesGetter,
KtorNetClientAttributesGetter,
)
fun setOpenTelemetry(openTelemetry: OpenTelemetry) {
this.openTelemetry = openTelemetry
}
fun setCapturedRequestHeaders(vararg headers: String) =
setCapturedRequestHeaders(headers.asList())
fun setCapturedRequestHeaders(headers: List<String>) {
httpAttributesExtractorBuilder.setCapturedRequestHeaders(headers)
}
fun setCapturedResponseHeaders(vararg headers: String) =
setCapturedResponseHeaders(headers.asList())
fun setCapturedResponseHeaders(headers: List<String>) {
httpAttributesExtractorBuilder.setCapturedResponseHeaders(headers)
}
fun addAttributesExtractors(vararg extractors: AttributesExtractor<in HttpRequestData, in HttpResponse>) =
addAttributesExtractors(extractors.asList())
fun addAttributesExtractors(extractors: Iterable<AttributesExtractor<in HttpRequestData, in HttpResponse>>) {
additionalExtractors += extractors
}
internal fun build(): KtorClientTracing {
val initializedOpenTelemetry = openTelemetry
?: throw IllegalArgumentException("OpenTelemetry must be set")
val instrumenterBuilder = Instrumenter.builder<HttpRequestData, HttpResponse>(
initializedOpenTelemetry,
INSTRUMENTATION_NAME,
HttpSpanNameExtractor.create(KtorHttpClientAttributesGetter),
)
val instrumenter = instrumenterBuilder
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(KtorHttpClientAttributesGetter))
.addAttributesExtractor(httpAttributesExtractorBuilder.build())
.addAttributesExtractors(additionalExtractors)
.addOperationMetrics(HttpClientMetrics.get())
.buildInstrumenter(alwaysClient())
return KtorClientTracing(
instrumenter = instrumenter,
propagators = initializedOpenTelemetry.propagators,
)
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.request.*
import io.ktor.client.statement.HttpResponse
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesGetter
internal object KtorHttpClientAttributesGetter : HttpClientAttributesGetter<HttpRequestData, HttpResponse> {
override fun getUrl(request: HttpRequestData) =
request.url.toString()
override fun getFlavor(request: HttpRequestData, response: HttpResponse?) =
null
override fun getMethod(request: HttpRequestData) =
request.method.value
override fun getRequestHeader(request: HttpRequestData, name: String) =
request.headers.getAll(name).orEmpty()
override fun getStatusCode(request: HttpRequestData, response: HttpResponse, error: Throwable?) =
response.status.value
override fun getResponseHeader(request: HttpRequestData, response: HttpResponse, name: String) =
response.headers.getAll(name).orEmpty()
}

View File

@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.request.HttpRequestBuilder
import io.opentelemetry.context.propagation.TextMapSetter
internal object KtorHttpHeadersSetter : TextMapSetter<HttpRequestBuilder> {
override fun set(carrier: HttpRequestBuilder?, key: String, value: String) {
carrier?.headers?.set(key, value)
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesGetter
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
internal object KtorNetClientAttributesGetter : NetClientAttributesGetter<HttpRequestData, HttpResponse> {
override fun getTransport(request: HttpRequestData, response: HttpResponse?) = IP_TCP
override fun getPeerName(request: HttpRequestData) = request.url.host
override fun getPeerPort(request: HttpRequestData) = request.url.port
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
package io.opentelemetry.instrumentation.ktor.v2_0.server
import io.ktor.server.request.*
import io.opentelemetry.context.propagation.TextMapGetter

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
package io.opentelemetry.instrumentation.ktor.v2_0.server
import io.ktor.server.plugins.*
import io.ktor.server.request.*

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
package io.opentelemetry.instrumentation.ktor.v2_0.server
import io.ktor.server.request.*
import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesGetter

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
package io.opentelemetry.instrumentation.ktor.v2_0.server
import io.ktor.server.application.*
import io.ktor.server.request.*
@ -23,6 +23,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerAttribut
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerMetrics
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor
import io.opentelemetry.instrumentation.ktor.v2_0.InstrumentationProperties.INSTRUMENTATION_NAME
import kotlinx.coroutines.withContext
class KtorServerTracing private constructor(
@ -81,7 +82,6 @@ class KtorServerTracing private constructor(
}
companion object Feature : BaseApplicationPlugin<Application, Configuration, KtorServerTracing> {
private val INSTRUMENTATION_NAME = "io.opentelemetry.ktor-2.0"
private val contextKey = AttributeKey<Context>("OpenTelemetry")
private val errorKey = AttributeKey<Throwable>("OpenTelemetryException")

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
import kotlinx.coroutines.runBlocking
class KtorHttpClientSingleConnection(
private val host: String,
private val port: Int,
private val installTracing: HttpClientConfig<*>.() -> Unit,
) : SingleConnection {
private val client: HttpClient
init {
val engine = CIO.create {
maxConnectionsCount = 1
}
client = HttpClient(engine) {
installTracing()
}
}
override fun doRequest(path: String, requestHeaders: MutableMap<String, String>) = runBlocking {
val request = HttpRequestBuilder(
scheme = "http",
host = host,
port = port,
path = path,
).apply {
requestHeaders.forEach { (name, value) -> headers.append(name, value) }
}
client.request(request).status.value
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0.client
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_FLAVOR
import kotlinx.coroutines.*
import org.junit.jupiter.api.extension.RegisterExtension
import java.net.URI
class KtorHttpClientTest : AbstractHttpClientTest<HttpRequestBuilder>() {
override fun buildRequest(requestMethod: String, uri: URI, requestHeaders: MutableMap<String, String>) =
HttpRequestBuilder(uri.toURL()).apply {
method = HttpMethod.parse(requestMethod)
requestHeaders.forEach { (header, value) -> headers.append(header, value) }
}
override fun sendRequest(request: HttpRequestBuilder, method: String, uri: URI, headers: MutableMap<String, String>) = runBlocking {
CLIENT.request(request).status.value
}
override fun sendRequestWithCallback(
request: HttpRequestBuilder,
method: String,
uri: URI,
headers: MutableMap<String, String>,
httpClientResult: HttpClientResult,
) {
CoroutineScope(Dispatchers.Default + Context.current().asContextElement()).launch {
try {
val statusCode = CLIENT.request(request).status.value
httpClientResult.complete(statusCode)
} catch (e: Throwable) {
httpClientResult.complete(e)
}
}
}
override fun configure(optionsBuilder: HttpClientTestOptions.Builder) {
with(optionsBuilder) {
// this instrumentation creates a span per each physical request
// related issue https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/5722
disableTestRedirects()
setHttpAttributes { DEFAULT_HTTP_ATTRIBUTES - HTTP_FLAVOR }
setSingleConnectionFactory { host, port ->
KtorHttpClientSingleConnection(host, port) { installTracing() }
}
}
}
companion object {
@JvmStatic
@RegisterExtension
private val TESTING = HttpClientInstrumentationExtension.forLibrary()
private val CLIENT = HttpClient(CIO) {
install(HttpRedirect)
installTracing()
}
private fun HttpClientConfig<*>.installTracing() {
install(KtorClientTracing) {
setOpenTelemetry(TESTING.openTelemetry)
}
}
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
package io.opentelemetry.instrumentation.ktor.v2_0.server
import io.ktor.http.*
import io.ktor.server.application.*

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v2_0
package io.opentelemetry.instrumentation.ktor.v2_0.server
import io.ktor.server.application.*
import io.opentelemetry.api.OpenTelemetry