Merge pull request #831 from johanvandeweerd/issue-614-spring-webclient-tracing

Issue 614 spring webclient tracing
This commit is contained in:
Tyler Benson 2019-07-23 07:50:06 -07:00 committed by GitHub
commit 0a79235d1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 505 additions and 38 deletions

View File

@ -0,0 +1,55 @@
package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class DefaultWebClientInstrumentation extends Instrumenter.Default {
public DefaultWebClientInstrumentation() {
super("spring-webflux", "spring-webflux-client");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.HttpClientDecorator",
packageName + ".SpringWebfluxHttpClientDecorator",
packageName + ".HttpHeadersInjectAdapter",
packageName + ".TracingClientResponseSubscriber",
packageName + ".TracingClientResponseSubscriber$1",
packageName + ".TracingClientResponseMono",
};
}
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return safeHasSuperType(
named("org.springframework.web.reactive.function.client.ExchangeFunction"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isPublic())
.and(named("exchange"))
.and(
takesArgument(
0, named("org.springframework.web.reactive.function.client.ClientRequest"))),
packageName + ".DefaultWebClientAdvice");
}
}

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import datadog.trace.agent.tooling.Instrumenter;

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;

View File

@ -0,0 +1,29 @@
package datadog.trace.instrumentation.springwebflux.client;
import io.opentracing.util.GlobalTracer;
import net.bytebuddy.asm.Advice;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Mono;
public class DefaultWebClientAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Thrown final Throwable throwable,
@Advice.This final ExchangeFunction exchangeFunction,
@Advice.Argument(0) final ClientRequest clientRequest,
@Advice.Return(readOnly = false) Mono<ClientResponse> mono) {
if (throwable == null
&& mono != null
// The response of the org.springframework.web.reactive.function.client.ExchangeFunction.exchange method is
// replaced by a decorator that in turn also calls the
// org.springframework.web.reactive.function.client.ExchangeFunction.exchange method. If the original return value
// is already decorated (we detect this if the "x-datadog-trace-id" is added), the result is not decorated again
// to avoid StackOverflowErrors.
&& !clientRequest.headers().keySet().contains("x-datadog-trace-id")) {
mono = new TracingClientResponseMono(clientRequest, exchangeFunction, GlobalTracer.get());
}
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.springwebflux.client;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
import org.springframework.http.HttpHeaders;
public class HttpHeadersInjectAdapter implements TextMap {
private final HttpHeaders headers;
public HttpHeadersInjectAdapter(final HttpHeaders headers) {
this.headers = headers;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
@Override
public void put(final String key, final String value) {
headers.set(key, value);
}
}

View File

@ -0,0 +1,55 @@
package datadog.trace.instrumentation.springwebflux.client;
import datadog.trace.agent.decorator.HttpClientDecorator;
import io.opentracing.Span;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
@Slf4j
public class SpringWebfluxHttpClientDecorator
extends HttpClientDecorator<ClientRequest, ClientResponse> {
public static final SpringWebfluxHttpClientDecorator DECORATE =
new SpringWebfluxHttpClientDecorator();
public void onCancel(final Span span) {
span.setTag("event", "cancelled");
span.setTag("message", "The subscription was cancelled");
}
@Override
protected String[] instrumentationNames() {
return new String[] {"spring-webflux", "spring-webflux-client"};
}
@Override
protected String component() {
return "spring-webflux-client";
}
@Override
protected String method(final ClientRequest httpRequest) {
return httpRequest.method().name();
}
@Override
protected URI url(final ClientRequest httpRequest) {
return httpRequest.url();
}
@Override
protected String hostname(final ClientRequest httpRequest) {
return httpRequest.url().getHost();
}
@Override
protected Integer port(final ClientRequest httpRequest) {
return httpRequest.url().getPort();
}
@Override
protected Integer status(final ClientResponse httpResponse) {
return httpResponse.statusCode().value();
}
}

View File

@ -0,0 +1,67 @@
package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
public class TracingClientResponseMono extends Mono<ClientResponse> {
private final ClientRequest clientRequest;
private final ExchangeFunction exchangeFunction;
private final Tracer tracer;
public TracingClientResponseMono(
final ClientRequest clientRequest,
final ExchangeFunction exchangeFunction,
final Tracer tracer) {
this.clientRequest = clientRequest;
this.exchangeFunction = exchangeFunction;
this.tracer = tracer;
}
@Override
public void subscribe(final CoreSubscriber<? super ClientResponse> subscriber) {
final Context context = subscriber.currentContext();
final Span parentSpan = context.<Span>getOrEmpty(Span.class).orElseGet(tracer::activeSpan);
final Span span =
tracer
.buildSpan("http.request")
.asChildOf(parentSpan)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start();
DECORATE.afterStart(span);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final ClientRequest mutatedRequest =
ClientRequest.from(clientRequest)
.headers(
httpHeaders ->
tracer.inject(
span.context(),
Format.Builtin.HTTP_HEADERS,
new HttpHeadersInjectAdapter(httpHeaders)))
.build();
exchangeFunction
.exchange(mutatedRequest)
.subscribe(
new TracingClientResponseSubscriber(subscriber, mutatedRequest, context, span, parentSpan));
}
}
}

View File

@ -0,0 +1,136 @@
package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;
public class TracingClientResponseSubscriber implements CoreSubscriber<ClientResponse> {
private final Tracer tracer = GlobalTracer.get();
private final CoreSubscriber<? super ClientResponse> subscriber;
private final ClientRequest clientRequest;
private final Context context;
private final AtomicReference<Span> spanRef;
private final Span parentSpan;
public TracingClientResponseSubscriber(
final CoreSubscriber<? super ClientResponse> subscriber,
final ClientRequest clientRequest,
final Context context,
final Span span,
final Span parentSpan) {
this.subscriber = subscriber;
this.clientRequest = clientRequest;
this.context = context;
spanRef = new AtomicReference<>(span);
this.parentSpan = parentSpan == null ? NoopSpan.INSTANCE : parentSpan;
}
@Override
public void onSubscribe(final Subscription subscription) {
final Span span = spanRef.get();
if (span == null) {
subscriber.onSubscribe(subscription);
return;
}
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
DECORATE.onRequest(span, clientRequest);
subscriber.onSubscribe(
new Subscription() {
@Override
public void request(final long n) {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
subscription.request(n);
}
}
@Override
public void cancel() {
DECORATE.onCancel(span);
DECORATE.beforeFinish(span);
subscription.cancel();
span.finish();
}
});
}
}
@Override
public void onNext(final ClientResponse clientResponse) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.onResponse(span, clientResponse);
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
subscriber.onNext(clientResponse);
}
}
@Override
public void onError(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
subscriber.onError(throwable);
}
}
@Override
public void onComplete() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
subscriber.onComplete();
}
}
@Override
public Context currentContext() {
return context;
}
}

View File

@ -1,10 +1,11 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.instrumentation.springwebflux.SpringWebfluxHttpServerDecorator.DECORATE;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils;
import io.opentracing.Span;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
@ -39,4 +40,10 @@ public class AdviceUtils {
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
public static void finishSpanIfPresent(
final ClientRequest clientRequest, final Throwable throwable) {
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) clientRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
}

View File

@ -1,6 +1,6 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.instrumentation.springwebflux.SpringWebfluxHttpServerDecorator.DECORATE;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils;

View File

@ -1,6 +1,6 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.instrumentation.springwebflux.SpringWebfluxHttpServerDecorator.DECORATE;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import datadog.trace.api.DDTags;
import io.opentracing.Span;

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import net.bytebuddy.asm.Advice;
import org.springframework.web.reactive.function.server.HandlerFunction;
@ -14,10 +14,10 @@ public class RouterFunctionAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.This final RouterFunction thiz,
@Advice.Argument(0) final ServerRequest serverRequest,
@Advice.Return(readOnly = false) Mono<HandlerFunction<?>> result,
@Advice.Thrown final Throwable throwable) {
@Advice.This final RouterFunction thiz,
@Advice.Argument(0) final ServerRequest serverRequest,
@Advice.Return(readOnly = false) Mono<HandlerFunction<?>> result,
@Advice.Thrown final Throwable throwable) {
if (throwable == null) {
result = result.doOnSuccessOrError(new RouteOnSuccessOrError(thiz, serverRequest));
} else {

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package datadog.trace.instrumentation.springwebflux.server;
import datadog.trace.agent.decorator.ServerDecorator;
import datadog.trace.api.DDSpanTypes;

View File

@ -1,4 +1,4 @@
import dd.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import dd.trace.instrumentation.springwebflux.server.SpringWebFluxTestApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory

View File

@ -1,10 +1,10 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.OkHttpUtils
import datadog.trace.api.DDSpanTypes
import dd.trace.instrumentation.springwebflux.EchoHandlerFunction
import dd.trace.instrumentation.springwebflux.FooModel
import dd.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import dd.trace.instrumentation.springwebflux.TestController
import dd.trace.instrumentation.springwebflux.server.EchoHandlerFunction
import dd.trace.instrumentation.springwebflux.server.FooModel
import dd.trace.instrumentation.springwebflux.server.SpringWebFluxTestApplication
import dd.trace.instrumentation.springwebflux.server.TestController
import io.opentracing.tag.Tags
import okhttp3.OkHttpClient
import okhttp3.Request

View File

@ -0,0 +1,94 @@
package dd.trace.instrumentation.springwebflux.client
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
import datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import org.springframework.http.HttpMethod
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient
import spock.lang.Shared
class SpringWebfluxHttpClientTest extends HttpClientTest<SpringWebfluxHttpClientDecorator> {
@Shared
def client = WebClient.builder().build()
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def hasParent = GlobalTracer.get().activeSpan() != null
ClientResponse response = client.method(HttpMethod.resolve(method))
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
.uri(uri)
.exchange()
.doOnSuccessOrError { success, error ->
blockUntilChildSpansFinished(1)
callback?.call()
}
.block()
if(hasParent) {
blockUntilChildSpansFinished(callback ? 3 : 2)
}
response.statusCode().value()
}
@Override
SpringWebfluxHttpClientDecorator decorator() {
return SpringWebfluxHttpClientDecorator.DECORATE
}
@Override
// parent spanRef must be cast otherwise it breaks debugging classloading (junit loads it early)
void clientSpan(TraceAssert trace, int index, Object parentSpan, String method = "GET", boolean renameService = false, boolean tagQueryString = false, URI uri = server.address.resolve("/success"), Integer status = 200, Throwable exception = null) {
super.clientSpan(trace, index, parentSpan, method, renameService, tagQueryString, uri, status, exception)
if (!exception) {
trace.span(index + 1) {
childOf(trace.span(index))
serviceName renameService ? "localhost" : "unnamed-java-app"
operationName "netty.client.request"
resourceName "$method $uri.path"
spanType DDSpanTypes.HTTP_CLIENT
errored exception != null
tags {
defaultTags()
if (exception) {
errorTags(exception.class, exception.message)
}
"$Tags.COMPONENT.key" NettyHttpClientDecorator.DECORATE.component()
if (status) {
"$Tags.HTTP_STATUS.key" status
}
"$Tags.HTTP_URL.key" "${uri.resolve(uri.path)}"
if (tagQueryString) {
"$DDTags.HTTP_QUERY" uri.query
"$DDTags.HTTP_FRAGMENT" { it == null || it == uri.fragment } // Optional
}
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" uri.port
"$Tags.PEER_HOST_IPV4.key" { it == null || it == "127.0.0.1" } // Optional
"$Tags.HTTP_METHOD.key" method
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
}
}
}
}
@Override
int size(int size) {
return size + 1
}
boolean testRedirects() {
false
}
boolean testConnectionFailure() {
false
}
}

View File

@ -1,4 +1,4 @@
package dd.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux.server
import datadog.trace.api.Trace
import org.springframework.http.MediaType

View File

@ -1,4 +1,4 @@
package dd.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux.server
import org.springframework.web.reactive.function.server.HandlerFunction

View File

@ -1,4 +1,4 @@
package dd.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux.server
class FooModel {
public long id

View File

@ -1,4 +1,4 @@
package dd.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux.server
import datadog.trace.api.Trace
import org.springframework.boot.autoconfigure.SpringBootApplication

View File

@ -1,4 +1,4 @@
package dd.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux.server
import datadog.trace.api.Trace
import org.springframework.web.bind.annotation.GetMapping

View File

@ -1,4 +1,4 @@
package dd.trace.instrumentation.springwebflux;
package dd.trace.instrumentation.springwebflux.server;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

View File

@ -18,6 +18,7 @@ import groovy.lang.Closure;
import groovy.lang.DelegatesTo;
import groovy.transform.stc.ClosureParams;
import groovy.transform.stc.SimpleType;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation;
@ -202,15 +203,13 @@ public abstract class AgentTestRunner extends Specification {
}
public void blockUntilChildSpansFinished(final int numberOfSpans) throws InterruptedException {
final DDSpan span = (DDSpan) io.opentracing.util.GlobalTracer.get().activeSpan();
if (span == null) {
// If there is no active span avoid getting an NPE
return;
}
final PendingTrace pendingTrace = span.context().getTrace();
final Span span = io.opentracing.util.GlobalTracer.get().activeSpan();
if (span instanceof DDSpan) {
final PendingTrace pendingTrace = ((DDSpan) span).context().getTrace();
while (pendingTrace.size() < numberOfSpans) {
Thread.sleep(10);
while (pendingTrace.size() < numberOfSpans) {
Thread.sleep(10);
}
}
}

View File

@ -188,7 +188,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
def "trace request with callback and no parent"() {
when:
def status = doRequest(method, server.address.resolve("/success"), ["is-dd-server": "false"]) {
runUnderTrace("child") {
runUnderTrace("callback") {
// Ensure consistent ordering of traces for assertion.
TEST_WRITER.waitForTraces(1)
}
@ -202,7 +202,7 @@ abstract class HttpClientTest<T extends HttpClientDecorator> extends AgentTestRu
clientSpan(it, 0, null, method, false)
}
trace(1, 1) {
basicSpan(it, 0, "child")
basicSpan(it, 0, "callback")
}
}