diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/client/DefaultWebClientInstrumentation.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/client/DefaultWebClientInstrumentation.java index 1799aa4781..ca23a46fce 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/client/DefaultWebClientInstrumentation.java +++ b/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/client/DefaultWebClientInstrumentation.java @@ -32,7 +32,6 @@ public class DefaultWebClientInstrumentation extends Instrumenter.Default { packageName + ".TracingClientResponseSubscriber", packageName + ".TracingClientResponseSubscriber$1", packageName + ".TracingClientResponseMono", - packageName + ".ClientResponseWrapper" }; } diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/ClientResponseWrapper.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/ClientResponseWrapper.java deleted file mode 100644 index 45624d70a4..0000000000 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/ClientResponseWrapper.java +++ /dev/null @@ -1,102 +0,0 @@ -package datadog.trace.instrumentation.springwebflux.client; - -import java.util.List; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseCookie; -import org.springframework.http.ResponseEntity; -import org.springframework.http.client.reactive.ClientHttpResponse; -import org.springframework.util.MultiValueMap; -import org.springframework.web.reactive.function.BodyExtractor; -import org.springframework.web.reactive.function.client.ClientResponse; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.context.Context; - -/** - * Wrapper class for ClientResponse that adds Context to the body Publisher - */ -public class ClientResponseWrapper implements ClientResponse { - - private final ClientResponse clientResponse; - private final Context context; - - public ClientResponseWrapper(final ClientResponse clientResponse, final Context context) { - this.clientResponse = clientResponse; - this.context = context; - } - - @Override - public HttpStatus statusCode() { - return clientResponse.statusCode(); - } - - @Override - public Headers headers() { - return clientResponse.headers(); - } - - @Override - public MultiValueMap cookies() { - return clientResponse.cookies(); - } - - @Override - public T body(final BodyExtractor extractor) { - return clientResponse.body(extractor); - } - - @Override - public Mono bodyToMono(final Class elementClass) { - return clientResponse.bodyToMono(elementClass).subscriberContext(context); - } - - @Override - public Mono bodyToMono(final ParameterizedTypeReference typeReference) { - return clientResponse.bodyToMono(typeReference).subscriberContext(context); - } - - @Override - public Flux bodyToFlux(final Class elementClass) { - return clientResponse.bodyToFlux(elementClass).subscriberContext(context); - } - - @Override - public Flux bodyToFlux(final ParameterizedTypeReference typeReference) { - return clientResponse.bodyToFlux(typeReference).subscriberContext(context); - } - - @Override - public Mono> toEntity(final Class bodyType) { - return clientResponse.toEntity(bodyType); - } - - @Override - public Mono> toEntity(final ParameterizedTypeReference typeReference) { - return clientResponse.toEntity(typeReference); - } - - @Override - public Mono>> toEntityList(final Class elementType) { - return clientResponse.toEntityList(elementType); - } - - @Override - public Mono>> toEntityList( - final ParameterizedTypeReference typeReference) { - return clientResponse.toEntityList(typeReference); - } - - /** - * ClientResponseWrapper is based on the ClientResponse from - * spring-webflux-5.0.0.RELEASE. Since spring-webflux 5.1 ClientResponse - * contains extra methods like rawStatusCode and gives methodNotFound - * exceptions at runtime if used in a project with the latest spring-webflux - * 5.1 or higher. - *

- * See https://docs.spring.io/spring/docs/5.1.x/javadoc-api/org/springframework/web/reactive/function/client/ClientResponse.html#rawStatusCode-- - */ - public int rawStatusCode() { - return clientResponse.statusCode().value(); - } -} diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseMono.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseMono.java index 8331659176..9170eb8c7b 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseMono.java +++ b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseMono.java @@ -2,6 +2,7 @@ package datadog.trace.instrumentation.springwebflux.client; import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE; +import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.Tracer; @@ -36,13 +37,18 @@ public class TracingClientResponseMono extends Mono { final Span span = tracer - .buildSpan("webflux.request") + .buildSpan("http.request") .asChildOf(parentSpan) .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) .start(); DECORATE.afterStart(span); try (final Scope scope = tracer.scopeManager().activate(span, false)) { + + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + final ClientRequest mutatedRequest = ClientRequest.from(clientRequest) .headers( @@ -55,7 +61,7 @@ public class TracingClientResponseMono extends Mono { exchangeFunction .exchange(mutatedRequest) .subscribe( - new TracingClientResponseSubscriber(subscriber, mutatedRequest, context, span)); + new TracingClientResponseSubscriber(subscriber, mutatedRequest, context, span, parentSpan)); } } } diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseSubscriber.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseSubscriber.java index 205e625efc..b096371dca 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseSubscriber.java +++ b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/client/TracingClientResponseSubscriber.java @@ -2,7 +2,13 @@ package datadog.trace.instrumentation.springwebflux.client; import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.noop.NoopSpan; +import io.opentracing.util.GlobalTracer; +import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Subscription; import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientResponse; @@ -11,71 +17,116 @@ import reactor.util.context.Context; public class TracingClientResponseSubscriber implements CoreSubscriber { + private final Tracer tracer = GlobalTracer.get(); private final CoreSubscriber subscriber; private final ClientRequest clientRequest; private final Context context; - private final Span span; + private final AtomicReference spanRef; + private final Span parentSpan; public TracingClientResponseSubscriber( final CoreSubscriber subscriber, final ClientRequest clientRequest, final Context context, - final Span span) { + final Span span, + final Span parentSpan) { this.subscriber = subscriber; this.clientRequest = clientRequest; - this.context = context.put(Span.class, span); - this.span = span; + this.context = context; + spanRef = new AtomicReference<>(span); + this.parentSpan = parentSpan == null ? NoopSpan.INSTANCE : parentSpan; } @Override public void onSubscribe(final Subscription subscription) { - DECORATE.onRequest(span, clientRequest); + final Span span = spanRef.get(); + if (span == null) { + subscriber.onSubscribe(subscription); + return; + } - subscriber.onSubscribe( - new Subscription() { - @Override - public void request(final long n) { - subscription.request(n); - } + try (final Scope scope = tracer.scopeManager().activate(span, false)) { - @Override - public void cancel() { - DECORATE.onCancel(span); - DECORATE.beforeFinish(span); - subscription.cancel(); - span.finish(); - } - }); + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + + DECORATE.onRequest(span, clientRequest); + + subscriber.onSubscribe( + new Subscription() { + @Override + public void request(final long n) { + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + subscription.request(n); + } + } + + @Override + public void cancel() { + DECORATE.onCancel(span); + DECORATE.beforeFinish(span); + subscription.cancel(); + span.finish(); + } + }); + } } @Override public void onNext(final ClientResponse clientResponse) { - try { - subscriber.onNext(new ClientResponseWrapper(clientResponse, context)); - } finally { + final Span span = spanRef.getAndSet(null); + if (span != null) { DECORATE.onResponse(span, clientResponse); + DECORATE.beforeFinish(span); + span.finish(); + } + + try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) { + + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + + subscriber.onNext(clientResponse); } } @Override public void onError(final Throwable throwable) { - try { - subscriber.onError(throwable); - } finally { + final Span span = spanRef.getAndSet(null); + if (span != null) { DECORATE.onError(span, throwable); DECORATE.beforeFinish(span); span.finish(); } + + try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) { + + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + + subscriber.onError(throwable); + } } @Override public void onComplete() { - try { - subscriber.onComplete(); - } finally { + final Span span = spanRef.getAndSet(null); + if (span != null) { DECORATE.beforeFinish(span); span.finish(); } + + try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) { + + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + + subscriber.onComplete(); + } } @Override diff --git a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientTest.groovy b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientTest.groovy index 7a097ed937..da3b6f36e8 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientTest.groovy +++ b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientTest.groovy @@ -1,7 +1,14 @@ package dd.trace.instrumentation.springwebflux.client +import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.base.HttpClientTest +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator import datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator +import io.opentracing.tag.Tags +import io.opentracing.util.GlobalTracer +import org.springframework.http.HttpMethod import org.springframework.web.reactive.function.client.ClientResponse import org.springframework.web.reactive.function.client.WebClient import spock.lang.Shared @@ -13,14 +20,20 @@ class SpringWebfluxHttpClientTest extends HttpClientTest headers, Closure callback) { - assert method == "GET" - ClientResponse response = client.get() - .headers({ h -> headers.forEach({ key, value -> h.add(key, value) }) }) + def hasParent = GlobalTracer.get().activeSpan() != null + ClientResponse response = client.method(HttpMethod.resolve(method)) + .headers { h -> headers.forEach({ key, value -> h.add(key, value) }) } .uri(uri) .exchange() + .doOnSuccessOrError { success, error -> + blockUntilChildSpansFinished(1) + callback?.call() + } .block() - callback?.call() + if(hasParent) { + blockUntilChildSpansFinished(callback ? 3 : 2) + } response.statusCode().value() } @@ -28,4 +41,54 @@ class SpringWebfluxHttpClientTest extends HttpClientTest extends AgentTestRu def "trace request with callback and no parent"() { when: def status = doRequest(method, server.address.resolve("/success"), ["is-dd-server": "false"]) { - runUnderTrace("child") { + runUnderTrace("callback") { // Ensure consistent ordering of traces for assertion. TEST_WRITER.waitForTraces(1) } @@ -202,7 +202,7 @@ abstract class HttpClientTest extends AgentTestRu clientSpan(it, 0, null, method, false) } trace(1, 1) { - basicSpan(it, 0, "child") + basicSpan(it, 0, "callback") } }