Fix trace propagation

This commit is contained in:
Tyler Benson 2019-06-14 11:27:30 -07:00
parent fff8006e51
commit aaba3fc095
7 changed files with 163 additions and 147 deletions

View File

@ -32,7 +32,6 @@ public class DefaultWebClientInstrumentation extends Instrumenter.Default {
packageName + ".TracingClientResponseSubscriber", packageName + ".TracingClientResponseSubscriber",
packageName + ".TracingClientResponseSubscriber$1", packageName + ".TracingClientResponseSubscriber$1",
packageName + ".TracingClientResponseMono", packageName + ".TracingClientResponseMono",
packageName + ".ClientResponseWrapper"
}; };
} }

View File

@ -1,102 +0,0 @@
package datadog.trace.instrumentation.springwebflux.client;
import java.util.List;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
/**
* Wrapper class for ClientResponse that adds Context to the body Publisher
*/
public class ClientResponseWrapper implements ClientResponse {
private final ClientResponse clientResponse;
private final Context context;
public ClientResponseWrapper(final ClientResponse clientResponse, final Context context) {
this.clientResponse = clientResponse;
this.context = context;
}
@Override
public HttpStatus statusCode() {
return clientResponse.statusCode();
}
@Override
public Headers headers() {
return clientResponse.headers();
}
@Override
public MultiValueMap<String, ResponseCookie> cookies() {
return clientResponse.cookies();
}
@Override
public <T> T body(final BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return clientResponse.body(extractor);
}
@Override
public <T> Mono<T> bodyToMono(final Class<? extends T> elementClass) {
return clientResponse.<T>bodyToMono(elementClass).subscriberContext(context);
}
@Override
public <T> Mono<T> bodyToMono(final ParameterizedTypeReference<T> typeReference) {
return clientResponse.<T>bodyToMono(typeReference).subscriberContext(context);
}
@Override
public <T> Flux<T> bodyToFlux(final Class<? extends T> elementClass) {
return clientResponse.<T>bodyToFlux(elementClass).subscriberContext(context);
}
@Override
public <T> Flux<T> bodyToFlux(final ParameterizedTypeReference<T> typeReference) {
return clientResponse.<T>bodyToFlux(typeReference).subscriberContext(context);
}
@Override
public <T> Mono<ResponseEntity<T>> toEntity(final Class<T> bodyType) {
return clientResponse.toEntity(bodyType);
}
@Override
public <T> Mono<ResponseEntity<T>> toEntity(final ParameterizedTypeReference<T> typeReference) {
return clientResponse.toEntity(typeReference);
}
@Override
public <T> Mono<ResponseEntity<List<T>>> toEntityList(final Class<T> elementType) {
return clientResponse.toEntityList(elementType);
}
@Override
public <T> Mono<ResponseEntity<List<T>>> toEntityList(
final ParameterizedTypeReference<T> typeReference) {
return clientResponse.toEntityList(typeReference);
}
/**
* ClientResponseWrapper is based on the ClientResponse from
* spring-webflux-5.0.0.RELEASE. Since spring-webflux 5.1 ClientResponse
* contains extra methods like rawStatusCode and gives methodNotFound
* exceptions at runtime if used in a project with the latest spring-webflux
* 5.1 or higher.
* <p>
* See https://docs.spring.io/spring/docs/5.1.x/javadoc-api/org/springframework/web/reactive/function/client/ClientResponse.html#rawStatusCode--
*/
public int rawStatusCode() {
return clientResponse.statusCode().value();
}
}

View File

@ -2,6 +2,7 @@ package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE; import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer; import io.opentracing.Tracer;
@ -36,13 +37,18 @@ public class TracingClientResponseMono extends Mono<ClientResponse> {
final Span span = final Span span =
tracer tracer
.buildSpan("webflux.request") .buildSpan("http.request")
.asChildOf(parentSpan) .asChildOf(parentSpan)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start(); .start();
DECORATE.afterStart(span); DECORATE.afterStart(span);
try (final Scope scope = tracer.scopeManager().activate(span, false)) { try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final ClientRequest mutatedRequest = final ClientRequest mutatedRequest =
ClientRequest.from(clientRequest) ClientRequest.from(clientRequest)
.headers( .headers(
@ -55,7 +61,7 @@ public class TracingClientResponseMono extends Mono<ClientResponse> {
exchangeFunction exchangeFunction
.exchange(mutatedRequest) .exchange(mutatedRequest)
.subscribe( .subscribe(
new TracingClientResponseSubscriber(subscriber, mutatedRequest, context, span)); new TracingClientResponseSubscriber(subscriber, mutatedRequest, context, span, parentSpan));
} }
} }
} }

View File

@ -2,7 +2,13 @@ package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE; import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
@ -11,71 +17,116 @@ import reactor.util.context.Context;
public class TracingClientResponseSubscriber implements CoreSubscriber<ClientResponse> { public class TracingClientResponseSubscriber implements CoreSubscriber<ClientResponse> {
private final Tracer tracer = GlobalTracer.get();
private final CoreSubscriber<? super ClientResponse> subscriber; private final CoreSubscriber<? super ClientResponse> subscriber;
private final ClientRequest clientRequest; private final ClientRequest clientRequest;
private final Context context; private final Context context;
private final Span span; private final AtomicReference<Span> spanRef;
private final Span parentSpan;
public TracingClientResponseSubscriber( public TracingClientResponseSubscriber(
final CoreSubscriber<? super ClientResponse> subscriber, final CoreSubscriber<? super ClientResponse> subscriber,
final ClientRequest clientRequest, final ClientRequest clientRequest,
final Context context, final Context context,
final Span span) { final Span span,
final Span parentSpan) {
this.subscriber = subscriber; this.subscriber = subscriber;
this.clientRequest = clientRequest; this.clientRequest = clientRequest;
this.context = context.put(Span.class, span); this.context = context;
this.span = span; spanRef = new AtomicReference<>(span);
this.parentSpan = parentSpan == null ? NoopSpan.INSTANCE : parentSpan;
} }
@Override @Override
public void onSubscribe(final Subscription subscription) { public void onSubscribe(final Subscription subscription) {
DECORATE.onRequest(span, clientRequest); final Span span = spanRef.get();
if (span == null) {
subscriber.onSubscribe(subscription);
return;
}
subscriber.onSubscribe( try (final Scope scope = tracer.scopeManager().activate(span, false)) {
new Subscription() {
@Override
public void request(final long n) {
subscription.request(n);
}
@Override if (scope instanceof TraceScope) {
public void cancel() { ((TraceScope) scope).setAsyncPropagation(true);
DECORATE.onCancel(span); }
DECORATE.beforeFinish(span);
subscription.cancel(); DECORATE.onRequest(span, clientRequest);
span.finish();
} subscriber.onSubscribe(
}); new Subscription() {
@Override
public void request(final long n) {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
subscription.request(n);
}
}
@Override
public void cancel() {
DECORATE.onCancel(span);
DECORATE.beforeFinish(span);
subscription.cancel();
span.finish();
}
});
}
} }
@Override @Override
public void onNext(final ClientResponse clientResponse) { public void onNext(final ClientResponse clientResponse) {
try { final Span span = spanRef.getAndSet(null);
subscriber.onNext(new ClientResponseWrapper(clientResponse, context)); if (span != null) {
} finally {
DECORATE.onResponse(span, clientResponse); DECORATE.onResponse(span, clientResponse);
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
subscriber.onNext(clientResponse);
} }
} }
@Override @Override
public void onError(final Throwable throwable) { public void onError(final Throwable throwable) {
try { final Span span = spanRef.getAndSet(null);
subscriber.onError(throwable); if (span != null) {
} finally {
DECORATE.onError(span, throwable); DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span); DECORATE.beforeFinish(span);
span.finish(); span.finish();
} }
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
subscriber.onError(throwable);
}
} }
@Override @Override
public void onComplete() { public void onComplete() {
try { final Span span = spanRef.getAndSet(null);
subscriber.onComplete(); if (span != null) {
} finally {
DECORATE.beforeFinish(span); DECORATE.beforeFinish(span);
span.finish(); span.finish();
} }
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
subscriber.onComplete();
}
} }
@Override @Override

View File

@ -1,7 +1,14 @@
package dd.trace.instrumentation.springwebflux.client package dd.trace.instrumentation.springwebflux.client
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.agent.test.base.HttpClientTest import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
import datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator import datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import org.springframework.http.HttpMethod
import org.springframework.web.reactive.function.client.ClientResponse import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.WebClient
import spock.lang.Shared import spock.lang.Shared
@ -13,14 +20,20 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<SpringWebfluxHttpClient
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
assert method == "GET" def hasParent = GlobalTracer.get().activeSpan() != null
ClientResponse response = client.get() ClientResponse response = client.method(HttpMethod.resolve(method))
.headers({ h -> headers.forEach({ key, value -> h.add(key, value) }) }) .headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
.uri(uri) .uri(uri)
.exchange() .exchange()
.doOnSuccessOrError { success, error ->
blockUntilChildSpansFinished(1)
callback?.call()
}
.block() .block()
callback?.call() if(hasParent) {
blockUntilChildSpansFinished(callback ? 3 : 2)
}
response.statusCode().value() response.statusCode().value()
} }
@ -28,4 +41,54 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<SpringWebfluxHttpClient
SpringWebfluxHttpClientDecorator decorator() { SpringWebfluxHttpClientDecorator decorator() {
return SpringWebfluxHttpClientDecorator.DECORATE return SpringWebfluxHttpClientDecorator.DECORATE
} }
@Override
// parent spanRef must be cast otherwise it breaks debugging classloading (junit loads it early)
void clientSpan(TraceAssert trace, int index, Object parentSpan, String method = "GET", boolean renameService = false, boolean tagQueryString = false, URI uri = server.address.resolve("/success"), Integer status = 200, Throwable exception = null) {
super.clientSpan(trace, index, parentSpan, method, renameService, tagQueryString, uri, status, exception)
if (!exception) {
trace.span(index + 1) {
childOf(trace.span(index))
serviceName renameService ? "localhost" : "unnamed-java-app"
operationName "netty.client.request"
resourceName "$method $uri.path"
spanType DDSpanTypes.HTTP_CLIENT
errored exception != null
tags {
defaultTags()
if (exception) {
errorTags(exception.class, exception.message)
}
"$Tags.COMPONENT.key" NettyHttpClientDecorator.DECORATE.component()
if (status) {
"$Tags.HTTP_STATUS.key" status
}
"$Tags.HTTP_URL.key" "${uri.resolve(uri.path)}"
if (tagQueryString) {
"$DDTags.HTTP_QUERY" uri.query
"$DDTags.HTTP_FRAGMENT" { it == null || it == uri.fragment } // Optional
}
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" uri.port
"$Tags.PEER_HOST_IPV4.key" { it == null || it == "127.0.0.1" } // Optional
"$Tags.HTTP_METHOD.key" method
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
}
}
}
}
@Override
int size(int size) {
return size + 1
}
boolean testRedirects() {
false
}
boolean testConnectionFailure() {
false
}
} }

View File

@ -18,6 +18,7 @@ import groovy.lang.Closure;
import groovy.lang.DelegatesTo; import groovy.lang.DelegatesTo;
import groovy.transform.stc.ClosureParams; import groovy.transform.stc.ClosureParams;
import groovy.transform.stc.SimpleType; import groovy.transform.stc.SimpleType;
import io.opentracing.Span;
import io.opentracing.Tracer; import io.opentracing.Tracer;
import java.lang.instrument.ClassFileTransformer; import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation; import java.lang.instrument.Instrumentation;
@ -202,15 +203,13 @@ public abstract class AgentTestRunner extends Specification {
} }
public void blockUntilChildSpansFinished(final int numberOfSpans) throws InterruptedException { public void blockUntilChildSpansFinished(final int numberOfSpans) throws InterruptedException {
final DDSpan span = (DDSpan) io.opentracing.util.GlobalTracer.get().activeSpan(); final Span span = io.opentracing.util.GlobalTracer.get().activeSpan();
if (span == null) { if (span instanceof DDSpan) {
// If there is no active span avoid getting an NPE final PendingTrace pendingTrace = ((DDSpan) span).context().getTrace();
return;
}
final PendingTrace pendingTrace = span.context().getTrace();
while (pendingTrace.size() < numberOfSpans) { while (pendingTrace.size() < numberOfSpans) {
Thread.sleep(10); Thread.sleep(10);
}
} }
} }

View File

@ -188,7 +188,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
def "trace request with callback and no parent"() { def "trace request with callback and no parent"() {
when: when:
def status = doRequest(method, server.address.resolve("/success"), ["is-dd-server": "false"]) { def status = doRequest(method, server.address.resolve("/success"), ["is-dd-server": "false"]) {
runUnderTrace("child") { runUnderTrace("callback") {
// Ensure consistent ordering of traces for assertion. // Ensure consistent ordering of traces for assertion.
TEST_WRITER.waitForTraces(1) TEST_WRITER.waitForTraces(1)
} }
@ -202,7 +202,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
clientSpan(it, 0, null, method, false) clientSpan(it, 0, null, method, false)
} }
trace(1, 1) { trace(1, 1) {
basicSpan(it, 0, "child") basicSpan(it, 0, "callback")
} }
} }