Ratpack services OpenTelemetry (#7477)
While using Ratpack Handlers, the context is added by the `OpenTelemetryServerHandler` While using Ratpack Services, we might need to add manually the OT Context into the Ratpack Execution and try to get it from the Execution in the Ratpack `HttpClient`
This commit is contained in:
parent
fc81be47e5
commit
99e600fff7
|
@ -28,7 +28,8 @@ final class OpenTelemetryHttpClient {
|
|||
httpClientSpec -> {
|
||||
httpClientSpec.requestIntercept(
|
||||
requestSpec -> {
|
||||
Context parentOtelCtx = Context.current();
|
||||
Context parentOtelCtx =
|
||||
Execution.current().maybeGet(Context.class).orElse(Context.current());
|
||||
if (!instrumenter.shouldStart(parentOtelCtx, requestSpec)) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.ratpack.client
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind
|
||||
import io.opentelemetry.api.OpenTelemetry
|
||||
import io.opentelemetry.api.trace.StatusCode
|
||||
import io.opentelemetry.api.trace.Tracer
|
||||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.context.propagation.ContextPropagators
|
||||
import io.opentelemetry.instrumentation.ratpack.RatpackTelemetry
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdk
|
||||
|
@ -15,10 +17,13 @@ import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
|
|||
import io.opentelemetry.sdk.trace.SdkTracerProvider
|
||||
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import ratpack.exec.Execution
|
||||
import ratpack.exec.Promise
|
||||
import ratpack.func.Action
|
||||
import ratpack.guice.Guice
|
||||
import ratpack.http.client.HttpClient
|
||||
import ratpack.service.Service
|
||||
import ratpack.service.StartEvent
|
||||
import ratpack.test.embed.EmbeddedApp
|
||||
import spock.lang.Specification
|
||||
import spock.util.concurrent.PollingConditions
|
||||
|
@ -27,6 +32,8 @@ import java.time.Duration
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_METHOD
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_ROUTE
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_STATUS_CODE
|
||||
|
@ -84,14 +91,14 @@ class InstrumentedHttpClientTest extends Specification {
|
|||
|
||||
new PollingConditions().eventually {
|
||||
def spanData = spanExporter.finishedSpanItems.find { it.name == "/foo" }
|
||||
def spanClientData = spanExporter.finishedSpanItems.find { it.name == "HTTP GET" && it.kind == SpanKind.CLIENT }
|
||||
def spanDataApi = spanExporter.finishedSpanItems.find { it.name == "/bar" && it.kind == SpanKind.SERVER }
|
||||
def spanClientData = spanExporter.finishedSpanItems.find { it.name == "HTTP GET" && it.kind == CLIENT }
|
||||
def spanDataApi = spanExporter.finishedSpanItems.find { it.name == "/bar" && it.kind == SERVER }
|
||||
|
||||
spanData.traceId == spanClientData.traceId
|
||||
spanData.traceId == spanDataApi.traceId
|
||||
|
||||
spanData.kind == SpanKind.SERVER
|
||||
spanClientData.kind == SpanKind.CLIENT
|
||||
spanData.kind == SERVER
|
||||
spanClientData.kind == CLIENT
|
||||
def atts = spanClientData.attributes.asMap()
|
||||
atts[HTTP_ROUTE] == "/bar"
|
||||
atts[HTTP_METHOD] == "GET"
|
||||
|
@ -154,15 +161,15 @@ class InstrumentedHttpClientTest extends Specification {
|
|||
spanData.traceId == spanClientData1.traceId
|
||||
spanData.traceId == spanClientData2.traceId
|
||||
|
||||
spanData.kind == SpanKind.SERVER
|
||||
spanData.kind == SERVER
|
||||
|
||||
spanClientData1.kind == SpanKind.CLIENT
|
||||
spanClientData1.kind == CLIENT
|
||||
def atts = spanClientData1.attributes.asMap()
|
||||
atts[HTTP_ROUTE] == "/foo"
|
||||
atts[HTTP_METHOD] == "GET"
|
||||
atts[HTTP_STATUS_CODE] == 200L
|
||||
|
||||
spanClientData2.kind == SpanKind.CLIENT
|
||||
spanClientData2.kind == CLIENT
|
||||
def atts2 = spanClientData2.attributes.asMap()
|
||||
atts2[HTTP_ROUTE] == "/bar"
|
||||
atts2[HTTP_METHOD] == "GET"
|
||||
|
@ -207,8 +214,7 @@ class InstrumentedHttpClientTest extends Specification {
|
|||
}
|
||||
}
|
||||
|
||||
app.test { httpClient -> "error" == httpClient.get("path-name").body.text
|
||||
}
|
||||
app.test { httpClient -> "error" == httpClient.get("path-name").body.text }
|
||||
|
||||
new PollingConditions().eventually {
|
||||
def spanData = spanExporter.finishedSpanItems.find { it.name == "/path-name" }
|
||||
|
@ -216,8 +222,8 @@ class InstrumentedHttpClientTest extends Specification {
|
|||
|
||||
spanData.traceId == spanClientData.traceId
|
||||
|
||||
spanData.kind == SpanKind.SERVER
|
||||
spanClientData.kind == SpanKind.CLIENT
|
||||
spanData.kind == SERVER
|
||||
spanClientData.kind == CLIENT
|
||||
def atts = spanClientData.attributes.asMap()
|
||||
atts[HTTP_ROUTE] == "/foo"
|
||||
atts[HTTP_METHOD] == "GET"
|
||||
|
@ -232,4 +238,139 @@ class InstrumentedHttpClientTest extends Specification {
|
|||
attributes[HTTP_STATUS_CODE] == 200L
|
||||
}
|
||||
}
|
||||
|
||||
def "propagate http trace in ratpack services with compute thread"() {
|
||||
expect:
|
||||
def latch = new CountDownLatch(1)
|
||||
|
||||
def otherApp = EmbeddedApp.of { spec ->
|
||||
spec.handlers {
|
||||
it.get("foo") { ctx -> ctx.render("bar") }
|
||||
}
|
||||
}
|
||||
|
||||
def app = EmbeddedApp.of { spec ->
|
||||
spec.registry(
|
||||
Guice.registry { bindings ->
|
||||
telemetry.configureServerRegistry(bindings)
|
||||
bindings.bindInstance(HttpClient, telemetry.instrumentHttpClient(HttpClient.of(Action.noop())))
|
||||
bindings.bindInstance(new BarService(latch, "${otherApp.address}foo", openTelemetry))
|
||||
},
|
||||
)
|
||||
spec.handlers { chain ->
|
||||
chain.get("foo") { ctx -> ctx.render("bar") }
|
||||
}
|
||||
}
|
||||
|
||||
app.address
|
||||
latch.await()
|
||||
new PollingConditions().eventually {
|
||||
def spanData = spanExporter.finishedSpanItems.find { it.name == "a-span" }
|
||||
def trace = spanExporter.finishedSpanItems.findAll { it.traceId == spanData.traceId }
|
||||
|
||||
trace.size() == 3
|
||||
}
|
||||
}
|
||||
|
||||
def "propagate http trace in ratpack services with fork executions"() {
|
||||
expect:
|
||||
def latch = new CountDownLatch(1)
|
||||
|
||||
def otherApp = EmbeddedApp.of { spec ->
|
||||
spec.handlers {
|
||||
it.get("foo") { ctx -> ctx.render("bar") }
|
||||
}
|
||||
}
|
||||
|
||||
def app = EmbeddedApp.of { spec ->
|
||||
spec.registry(
|
||||
Guice.registry { bindings ->
|
||||
telemetry.configureServerRegistry(bindings)
|
||||
bindings.bindInstance(HttpClient, telemetry.instrumentHttpClient(HttpClient.of(Action.noop())))
|
||||
bindings.bindInstance(new BarForkService(latch, "${otherApp.address}foo", openTelemetry))
|
||||
},
|
||||
)
|
||||
spec.handlers { chain ->
|
||||
chain.get("foo") { ctx -> ctx.render("bar") }
|
||||
}
|
||||
}
|
||||
|
||||
app.address
|
||||
latch.await()
|
||||
new PollingConditions().eventually {
|
||||
def spanData = spanExporter.finishedSpanItems.find { it.name == "a-span" }
|
||||
def trace = spanExporter.finishedSpanItems.findAll { it.traceId == spanData.traceId }
|
||||
|
||||
trace.size() == 3
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class BarService implements Service {
|
||||
private final String url
|
||||
private final CountDownLatch latch
|
||||
private final OpenTelemetry opentelemetry
|
||||
|
||||
BarService(CountDownLatch latch, String url, OpenTelemetry opentelemetry) {
|
||||
this.latch = latch
|
||||
this.url = url
|
||||
this.opentelemetry = opentelemetry
|
||||
}
|
||||
|
||||
private Tracer tracer = opentelemetry.tracerProvider.tracerBuilder("testing").build()
|
||||
|
||||
void onStart(StartEvent event) {
|
||||
def parentContext = Context.current()
|
||||
def span = tracer.spanBuilder("a-span")
|
||||
.setParent(parentContext)
|
||||
.startSpan()
|
||||
|
||||
Context otelContext = parentContext.with(span)
|
||||
otelContext.makeCurrent().withCloseable {
|
||||
Execution.current().add(Context, otelContext)
|
||||
def httpClient = event.registry.get(HttpClient)
|
||||
httpClient.get(new URI(url))
|
||||
.flatMap { httpClient.get(new URI(url)) }
|
||||
.then {
|
||||
span.end()
|
||||
latch.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class BarForkService implements Service {
|
||||
private final String url
|
||||
private final CountDownLatch latch
|
||||
private final OpenTelemetry opentelemetry
|
||||
|
||||
BarForkService(CountDownLatch latch, String url, OpenTelemetry opentelemetry) {
|
||||
this.latch = latch
|
||||
this.url = url
|
||||
this.opentelemetry = opentelemetry
|
||||
}
|
||||
|
||||
private Tracer tracer = opentelemetry.tracerProvider.tracerBuilder("testing").build()
|
||||
|
||||
void onStart(StartEvent event) {
|
||||
Execution.fork().start {
|
||||
def parentContext = Context.current()
|
||||
def span = tracer.spanBuilder("a-span")
|
||||
.setParent(parentContext)
|
||||
.startSpan()
|
||||
|
||||
Context otelContext = parentContext.with(span)
|
||||
otelContext.makeCurrent().withCloseable {
|
||||
Execution.current().add(Context, otelContext)
|
||||
def httpClient = event.registry.get(HttpClient)
|
||||
httpClient.get(new URI(url))
|
||||
.flatMap { httpClient.get(new URI(url)) }
|
||||
.then {
|
||||
span.end()
|
||||
latch.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue