Add library instrumentation for ktor. (#4983)

This commit is contained in:
Anuraag Agrawal 2022-01-05 13:14:31 +09:00 committed by GitHub
parent 7c58220d65
commit e08ed9d448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 529 additions and 0 deletions

View File

@ -0,0 +1,18 @@
# Ktor Instrumentation
This package contains libraries to help instrument Ktor. Currently, only server instrumentation is supported.
## Initializing server instrumentation
Initialize instrumentation by installing the `KtorServerTracing` feature. You must set the `OpenTelemetry` to use with
the feature.
```kotlin
OpenTelemetry openTelemetry = initializeOpenTelemetryForMe()
embeddedServer(Netty, 8080) {
install(KtorServerTracing) {
setOpenTelemetry(openTelemetry)
}
}
```

View File

@ -0,0 +1,37 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("otel.library-instrumentation")
id("org.jetbrains.kotlin.jvm")
}
dependencies {
library("io.ktor:ktor-server-core:1.0.0")
implementation("io.opentelemetry:opentelemetry-extension-kotlin")
compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// Note, we do not have a :testing library yet because there doesn't seem to be a way to have the Kotlin classes
// available for use from Spock. We will first need to migrate HttpServerTest to be usable outside of Spock.
testLibrary("io.ktor:ktor-server-netty:1.0.0")
}
tasks {
withType(KotlinCompile::class).configureEach {
kotlinOptions {
jvmTarget = "1.8"
}
}
val compileTestKotlin by existing(AbstractCompile::class)
named<GroovyCompile>("compileTestGroovy") {
// Note: look like it should be `classpath += files(sourceSets.test.kotlin.classesDirectory)`
// instead, but kotlin plugin doesn't support it (yet?)
classpath = classpath.plus(files(compileTestKotlin.get().destinationDir))
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.application.*
import io.ktor.request.*
import io.opentelemetry.context.propagation.TextMapGetter
internal object ApplicationRequestGetter : TextMapGetter<ApplicationRequest> {
override fun keys(carrier: ApplicationRequest): Iterable<String> {
return carrier.headers.names()
}
override fun get(carrier: ApplicationRequest?, name: String): String? {
return carrier?.headers?.get(name)
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.application.*
import io.ktor.features.*
import io.ktor.request.*
import io.ktor.response.*
import io.ktor.routing.*
import io.opentelemetry.instrumentation.api.instrumenter.http.CapturedHttpHeaders
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerAttributesExtractor
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
internal class KtorHttpServerAttributesExtractor(capturedHttpHeaders: CapturedHttpHeaders) :
HttpServerAttributesExtractor<ApplicationRequest, ApplicationResponse>(capturedHttpHeaders) {
override fun method(request: ApplicationRequest): String {
return request.httpMethod.value
}
override fun requestHeader(request: ApplicationRequest, name: String): List<String> {
return request.headers.getAll(name) ?: emptyList()
}
override fun requestContentLength(request: ApplicationRequest, response: ApplicationResponse?): Long? {
return null
}
override fun requestContentLengthUncompressed(request: ApplicationRequest, response: ApplicationResponse?): Long? {
return null
}
override fun statusCode(request: ApplicationRequest, response: ApplicationResponse): Int? {
return response.status()?.value
}
override fun responseContentLength(request: ApplicationRequest, response: ApplicationResponse): Long? {
return null
}
override fun responseContentLengthUncompressed(request: ApplicationRequest, response: ApplicationResponse): Long? {
return null
}
override fun responseHeader(request: ApplicationRequest, response: ApplicationResponse, name: String): List<String> {
return response.headers.allValues().getAll(name) ?: emptyList()
}
override fun flavor(request: ApplicationRequest): String? {
return when (request.httpVersion) {
"HTTP/1.1" -> SemanticAttributes.HttpFlavorValues.HTTP_1_1
"HTTP/2.0" -> SemanticAttributes.HttpFlavorValues.HTTP_2_0
else -> null
}
}
override fun target(request: ApplicationRequest): String {
return request.uri
}
override fun route(request: ApplicationRequest): String? {
return null
}
override fun scheme(request: ApplicationRequest): String {
return request.origin.scheme
}
override fun serverName(request: ApplicationRequest, response: ApplicationResponse?): String? {
return null
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.features.*
import io.ktor.request.*
import io.ktor.response.*
import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesExtractor
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
internal class KtorNetServerAttributesExtractor : NetServerAttributesExtractor<ApplicationRequest, ApplicationResponse>() {
override fun transport(request: ApplicationRequest): String {
return SemanticAttributes.NetTransportValues.IP_TCP
}
override fun peerName(request: ApplicationRequest): String {
return request.origin.host
}
override fun peerPort(request: ApplicationRequest): Int {
return request.origin.port
}
override fun peerIp(request: ApplicationRequest): String {
return request.origin.remoteHost
}
}

View File

@ -0,0 +1,161 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.application.*
import io.ktor.request.*
import io.ktor.response.*
import io.ktor.routing.*
import io.ktor.util.*
import io.ktor.util.pipeline.*
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.api.config.Config
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor
import io.opentelemetry.instrumentation.api.instrumenter.http.CapturedHttpHeaders
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerMetrics
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor
import io.opentelemetry.instrumentation.api.servlet.ServerSpanNaming
import kotlinx.coroutines.withContext
class KtorServerTracing private constructor(
private val instrumenter: Instrumenter<ApplicationRequest, ApplicationResponse>
) {
class Configuration {
internal lateinit var openTelemetry: OpenTelemetry
internal var capturedHttpHeaders = CapturedHttpHeaders.server(Config.get())
internal val additionalExtractors = mutableListOf<AttributesExtractor<in ApplicationRequest, in ApplicationResponse>>()
internal var statusExtractor:
(SpanStatusExtractor<ApplicationRequest, ApplicationResponse>) -> SpanStatusExtractor<in ApplicationRequest, in ApplicationResponse> = { a -> a }
fun setOpenTelemetry(openTelemetry: OpenTelemetry) {
this.openTelemetry = openTelemetry
}
fun setStatusExtractor(extractor: (SpanStatusExtractor<ApplicationRequest, ApplicationResponse>) -> SpanStatusExtractor<in ApplicationRequest, in ApplicationResponse>) {
this.statusExtractor = extractor
}
fun addAttributeExtractor(extractor: AttributesExtractor<in ApplicationRequest, in ApplicationResponse>) {
additionalExtractors.add(extractor)
}
fun captureHttpHeaders(capturedHttpHeaders: CapturedHttpHeaders) {
this.capturedHttpHeaders = capturedHttpHeaders
}
internal fun isOpenTelemetryInitialized(): Boolean = this::openTelemetry.isInitialized
}
private fun start(call: ApplicationCall): Context? {
val parentContext = Context.current()
if (!instrumenter.shouldStart(parentContext, call.request)) {
return null
}
return instrumenter.start(parentContext, call.request)
}
private fun end(context: Context, call: ApplicationCall, error: Throwable?) {
instrumenter.end(context, call.request, call.response, error)
}
companion object Feature : ApplicationFeature<Application, Configuration, KtorServerTracing> {
private val INSTRUMENTATION_NAME = "io.opentelemetry.ktor-1.0"
private val contextKey = AttributeKey<Context>("OpenTelemetry")
private val errorKey = AttributeKey<Throwable>("OpenTelemetryException")
override val key: AttributeKey<KtorServerTracing> = AttributeKey("OpenTelemetry")
override fun install(pipeline: Application, configure: Configuration.() -> Unit): KtorServerTracing {
val configuration = Configuration().apply(configure)
if (!configuration.isOpenTelemetryInitialized()) {
throw IllegalArgumentException("OpenTelemetry must be set")
}
val httpAttributesExtractor = KtorHttpServerAttributesExtractor(configuration.capturedHttpHeaders)
val instrumenterBuilder = Instrumenter.builder<ApplicationRequest, ApplicationResponse>(
configuration.openTelemetry,
INSTRUMENTATION_NAME,
HttpSpanNameExtractor.create(httpAttributesExtractor)
)
configuration.additionalExtractors.forEach { instrumenterBuilder.addAttributesExtractor(it) }
with(instrumenterBuilder) {
setSpanStatusExtractor(configuration.statusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor)))
addAttributesExtractor(KtorNetServerAttributesExtractor())
addAttributesExtractor(httpAttributesExtractor)
addRequestMetrics(HttpServerMetrics.get())
addContextCustomizer(ServerSpanNaming.get())
}
val instrumenter = instrumenterBuilder.newServerInstrumenter(ApplicationRequestGetter)
val feature = KtorServerTracing(instrumenter)
val startPhase = PipelinePhase("OpenTelemetry")
pipeline.insertPhaseBefore(ApplicationCallPipeline.Monitoring, startPhase)
pipeline.intercept(startPhase) {
val context = feature.start(call)
if (context != null) {
call.attributes.put(contextKey, context)
withContext(context.asContextElement()) {
try {
proceed()
} catch (err: Throwable) {
// Stash error for reporting later since need ktor to finish setting up the response
call.attributes.put(errorKey, err)
throw err
}
}
} else {
proceed()
}
}
val postSendPhase = PipelinePhase("OpenTelemetryPostSend")
pipeline.sendPipeline.insertPhaseAfter(ApplicationSendPipeline.After, postSendPhase)
pipeline.sendPipeline.intercept(postSendPhase) {
val context = call.attributes.getOrNull(contextKey)
if (context != null) {
var error: Throwable? = call.attributes.getOrNull(errorKey)
try {
proceed()
} catch (t: Throwable) {
error = t
throw t
} finally {
feature.end(context, call, error)
}
} else {
proceed()
}
}
pipeline.environment.monitor.subscribe(Routing.RoutingCallStarted) { call ->
val context = call.attributes.getOrNull(contextKey)
if (context != null) {
ServerSpanNaming.updateServerSpanName(context, ServerSpanNaming.Source.SERVLET, { _, arg -> arg.route.parent.toString() }, call)
}
}
return feature
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.server.engine.ApplicationEngine
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.TimeUnit
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.NOT_FOUND
class KtorHttpServerTest extends HttpServerTest<ApplicationEngine> implements LibraryTestTrait {
@Override
ApplicationEngine startServer(int port) {
return TestServer.startServer(port, openTelemetry)
}
@Override
void stopServer(ApplicationEngine server) {
server.stop(0, 10, TimeUnit.SECONDS)
}
// ktor does not have a controller lifecycle so the server span ends immediately when the response is sent, which is
// before the controller span finishes.
@Override
boolean verifyServerSpanEndTime() {
return false
}
@Override
String expectedServerSpanName(ServerEndpoint endpoint) {
switch (endpoint) {
case NOT_FOUND:
return "HTTP GET"
default:
return endpoint.resolvePath(address).path
}
}
@Override
List<AttributeKey<?>> extraAttributes() {
[
SemanticAttributes.NET_PEER_NAME,
SemanticAttributes.NET_TRANSPORT
]
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.application.*
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.instrumentation.test.base.HttpServerTest
class KtorTestUtil {
companion object {
fun installOpenTelemetry(application: Application, openTelemetry: OpenTelemetry) {
application.install(KtorServerTracing) {
setOpenTelemetry(openTelemetry)
captureHttpHeaders(HttpServerTest.capturedHttpHeadersForTesting())
}
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.ktor.v1_0
import io.ktor.application.*
import io.ktor.http.*
import io.ktor.request.*
import io.ktor.response.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.*
import kotlinx.coroutines.withContext
import java.util.concurrent.ExecutionException
class TestServer {
companion object {
private val tracer = GlobalOpenTelemetry.getTracer("test")
@JvmStatic
fun startServer(port: Int, openTelemetry: OpenTelemetry): ApplicationEngine {
return embeddedServer(Netty, port = port) {
KtorTestUtil.installOpenTelemetry(this, openTelemetry)
routing {
get(SUCCESS.path) {
controller(SUCCESS) {
call.respondText(SUCCESS.body, status = HttpStatusCode.fromValue(SUCCESS.status))
}
}
get(REDIRECT.path) {
controller(REDIRECT) {
call.respondRedirect(REDIRECT.body)
}
}
get(ERROR.path) {
controller(ERROR) {
call.respondText(ERROR.body, status = HttpStatusCode.fromValue(ERROR.status))
}
}
get(EXCEPTION.path) {
controller(EXCEPTION) {
throw Exception(EXCEPTION.body)
}
}
get("/query") {
controller(QUERY_PARAM) {
call.respondText("some=${call.request.queryParameters["some"]}", status = HttpStatusCode.fromValue(QUERY_PARAM.status))
}
}
get("/path/{id}/param") {
controller(PATH_PARAM) {
call.respondText(call.parameters["id"] ?: "", status = HttpStatusCode.fromValue(PATH_PARAM.status))
}
}
get("/child") {
controller(INDEXED_CHILD) {
INDEXED_CHILD.collectSpanAttributes { call.request.queryParameters[it] }
call.respondText(INDEXED_CHILD.body, status = HttpStatusCode.fromValue(INDEXED_CHILD.status))
}
}
get("/captureHeaders") {
controller(CAPTURE_HEADERS) {
call.response.header("X-Test-Response", call.request.header("X-Test-Request") ?: "")
call.respondText(CAPTURE_HEADERS.body, status = HttpStatusCode.fromValue(CAPTURE_HEADERS.status))
}
}
}
}.start()
}
// Copy in HttpServerTest.controller but make it a suspending function
private suspend fun controller(endpoint: HttpServerTest.ServerEndpoint, wrapped: suspend () -> Unit) {
assert(Span.current().spanContext.isValid, { "Controller should have a parent span. " })
if (endpoint == NOT_FOUND) {
wrapped()
}
val span = tracer.spanBuilder("controller").setSpanKind(SpanKind.INTERNAL).startSpan()
try {
withContext(Context.current().with(span).asContextElement()) {
wrapped()
}
span.end()
} catch (e: Exception) {
span.setStatus(StatusCode.ERROR)
span.recordException(if (e is ExecutionException) e.cause ?: e else e)
span.end()
throw e
}
}
}
}

View File

@ -256,6 +256,7 @@ include(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library")
include(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")
include(":instrumentation:kafka:kafka-streams-0.11:javaagent")
include(":instrumentation:kotlinx-coroutines:javaagent")
include(":instrumentation:ktor-1.0:library")
include(":instrumentation:kubernetes-client-7.0:javaagent")
include(":instrumentation:kubernetes-client-7.0:javaagent-unit-tests")
include(":instrumentation:lettuce:lettuce-common:library")