Fix webflux client filter subscribe (#2646)
* Fix webflux client filter subscribe * Add test * Fix test * Fix test, take 2 * Fix another test * Suppress test for another module * Suppress another library instrumentation * Another * Add nested client suppression in Armeria * Add comments * Revert extra line
This commit is contained in:
parent
97d58d7ff4
commit
0d11dbe565
|
@ -28,11 +28,16 @@ final class OpenTelemetryClient extends SimpleDecoratingHttpClient {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
|
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
|
||||||
|
Context parentContext = Context.current();
|
||||||
|
if (!clientTracer.shouldStartSpan(parentContext)) {
|
||||||
|
return unwrap().execute(ctx, req);
|
||||||
|
}
|
||||||
|
|
||||||
// Always available in practice.
|
// Always available in practice.
|
||||||
long requestStartTimeMicros =
|
long requestStartTimeMicros =
|
||||||
ctx.log().ensureAvailable(RequestLogProperty.REQUEST_START_TIME).requestStartTimeMicros();
|
ctx.log().ensureAvailable(RequestLogProperty.REQUEST_START_TIME).requestStartTimeMicros();
|
||||||
long requestStartTimeNanos = TimeUnit.MICROSECONDS.toNanos(requestStartTimeMicros);
|
long requestStartTimeNanos = TimeUnit.MICROSECONDS.toNanos(requestStartTimeMicros);
|
||||||
Context context = clientTracer.startSpan(Context.current(), ctx, ctx, requestStartTimeNanos);
|
Context context = clientTracer.startSpan(parentContext, ctx, ctx, requestStartTimeNanos);
|
||||||
|
|
||||||
Span span = Span.fromContext(context);
|
Span span = Span.fromContext(context);
|
||||||
if (span.isRecording()) {
|
if (span.isRecording()) {
|
||||||
|
|
|
@ -13,4 +13,10 @@ class ArmeriaHttpClientTest extends AbstractArmeriaHttpClientTest implements Lib
|
||||||
WebClientBuilder configureClient(WebClientBuilder clientBuilder) {
|
WebClientBuilder configureClient(WebClientBuilder clientBuilder) {
|
||||||
return clientBuilder.decorator(ArmeriaTracing.create(getOpenTelemetry()).newClientDecorator())
|
return clientBuilder.decorator(ArmeriaTracing.create(getOpenTelemetry()).newClientDecorator())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// library instrumentation doesn't have a good way of suppressing nested CLIENT spans yet
|
||||||
|
@Override
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,13 @@ abstract class JdkHttpClientTest extends HttpClientTest implements AgentTestTrai
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO nested client span is not created, but context is still injected
|
||||||
|
// which is not what the test expects
|
||||||
|
@Override
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
@Requires({ !System.getProperty("java.vm.name").contains("IBM J9 VM") })
|
@Requires({ !System.getProperty("java.vm.name").contains("IBM J9 VM") })
|
||||||
def "test https request"() {
|
def "test https request"() {
|
||||||
given:
|
given:
|
||||||
|
|
|
@ -16,4 +16,10 @@ class OkHttp3AsyncTest extends AbstractOkHttp3AsyncTest implements LibraryTestTr
|
||||||
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
|
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
|
||||||
return clientBuilder.addInterceptor(OkHttpTracing.create(getOpenTelemetry()).newInterceptor())
|
return clientBuilder.addInterceptor(OkHttpTracing.create(getOpenTelemetry()).newInterceptor())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// library instrumentation doesn't have a good way of suppressing nested CLIENT spans yet
|
||||||
|
@Override
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,4 +13,10 @@ class OkHttp3Test extends AbstractOkHttp3Test implements LibraryTestTrait {
|
||||||
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
|
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
|
||||||
return clientBuilder.addInterceptor(OkHttpTracing.create(getOpenTelemetry()).newInterceptor())
|
return clientBuilder.addInterceptor(OkHttpTracing.create(getOpenTelemetry()).newInterceptor())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// library instrumentation doesn't have a good way of suppressing nested CLIENT spans yet
|
||||||
|
@Override
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,4 +45,10 @@ class RestTemplateInstrumentationTest extends HttpClientTest implements LibraryT
|
||||||
boolean testRemoteConnection() {
|
boolean testRemoteConnection() {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// library instrumentation doesn't have a good way of suppressing nested CLIENT spans yet
|
||||||
|
@Override
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ public class WebClientTracingFilter implements ExchangeFilterFunction {
|
||||||
public void subscribe(CoreSubscriber<? super ClientResponse> subscriber) {
|
public void subscribe(CoreSubscriber<? super ClientResponse> subscriber) {
|
||||||
Context parentContext = Context.current();
|
Context parentContext = Context.current();
|
||||||
if (!tracer().shouldStartSpan(parentContext)) {
|
if (!tracer().shouldStartSpan(parentContext)) {
|
||||||
|
next.exchange(request).subscribe(subscriber);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,14 @@ class VertxHttpClientTest extends HttpClientTest implements AgentTestTrait {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO the vertx client span is suppressed, but also so is the vertx client instrumentation
|
||||||
|
// context propagation down to netty, and so netty doesn't see any existing context,
|
||||||
|
// and so it creates a (not-nested) client span, which is not what the test expects
|
||||||
|
@Override
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
SingleConnection createSingleConnection(String host, int port) {
|
SingleConnection createSingleConnection(String host, int port) {
|
||||||
//This test fails on Vert.x 3.0 and only works starting from 3.1
|
//This test fails on Vert.x 3.0 and only works starting from 3.1
|
||||||
|
|
|
@ -10,6 +10,7 @@ import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||||
import static io.opentelemetry.instrumentation.test.server.http.TestHttpServer.httpServer
|
import static io.opentelemetry.instrumentation.test.server.http.TestHttpServer.httpServer
|
||||||
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
|
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
|
||||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
|
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
|
||||||
|
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderParentClientSpan
|
||||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
||||||
import static org.junit.Assume.assumeTrue
|
import static org.junit.Assume.assumeTrue
|
||||||
|
|
||||||
|
@ -137,6 +138,33 @@ abstract class HttpClientTest extends InstrumentationSpecification {
|
||||||
method << BODY_METHODS
|
method << BODY_METHODS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "should suppress nested CLIENT span if already under parent CLIENT span"() {
|
||||||
|
given:
|
||||||
|
assumeTrue(testWithClientParent())
|
||||||
|
|
||||||
|
when:
|
||||||
|
def status = runUnderParentClientSpan {
|
||||||
|
doRequest(method, server.address.resolve("/success"))
|
||||||
|
}
|
||||||
|
|
||||||
|
then:
|
||||||
|
status == 200
|
||||||
|
// there should be 2 separate traces since the nested CLIENT span is suppressed
|
||||||
|
// (and the span context propagation along with it)
|
||||||
|
assertTraces(2) {
|
||||||
|
trace(0, 1) {
|
||||||
|
basicSpan(it, 0, "parent-client-span")
|
||||||
|
}
|
||||||
|
trace(1, 1) {
|
||||||
|
serverSpan(it, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
where:
|
||||||
|
method << BODY_METHODS
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
//FIXME: add tests for POST with large/chunked data
|
//FIXME: add tests for POST with large/chunked data
|
||||||
|
|
||||||
def "trace request without propagation"() {
|
def "trace request without propagation"() {
|
||||||
|
@ -568,6 +596,10 @@ abstract class HttpClientTest extends InstrumentationSpecification {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean testWithClientParent() {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
boolean testRedirects() {
|
boolean testRedirects() {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import io.opentelemetry.api.trace.Span
|
||||||
import io.opentelemetry.api.trace.SpanKind
|
import io.opentelemetry.api.trace.SpanKind
|
||||||
import io.opentelemetry.api.trace.StatusCode
|
import io.opentelemetry.api.trace.StatusCode
|
||||||
import io.opentelemetry.api.trace.Tracer
|
import io.opentelemetry.api.trace.Tracer
|
||||||
|
import io.opentelemetry.extension.annotations.WithSpan
|
||||||
import io.opentelemetry.instrumentation.test.asserts.AttributesAssert
|
import io.opentelemetry.instrumentation.test.asserts.AttributesAssert
|
||||||
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
|
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
|
||||||
import io.opentelemetry.instrumentation.test.server.ServerTraceUtils
|
import io.opentelemetry.instrumentation.test.server.ServerTraceUtils
|
||||||
|
@ -53,6 +54,11 @@ class TraceUtils {
|
||||||
tracer.spanBuilder(spanName).startSpan().end()
|
tracer.spanBuilder(spanName).startSpan().end()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@WithSpan(value = "parent-client-span", kind = SpanKind.CLIENT)
|
||||||
|
static <T> T runUnderParentClientSpan(Callable<T> r) {
|
||||||
|
r.call()
|
||||||
|
}
|
||||||
|
|
||||||
static basicSpan(TraceAssert trace, int index, String operation, Object parentSpan = null, Throwable exception = null,
|
static basicSpan(TraceAssert trace, int index, String operation, Object parentSpan = null, Throwable exception = null,
|
||||||
@ClosureParams(value = SimpleType, options = ['io.opentelemetry.instrumentation.test.asserts.AttributesAssert'])
|
@ClosureParams(value = SimpleType, options = ['io.opentelemetry.instrumentation.test.asserts.AttributesAssert'])
|
||||||
@DelegatesTo(value = AttributesAssert, strategy = Closure.DELEGATE_FIRST) Closure additionAttributesAssert = null) {
|
@DelegatesTo(value = AttributesAssert, strategy = Closure.DELEGATE_FIRST) Closure additionAttributesAssert = null) {
|
||||||
|
|
Loading…
Reference in New Issue