Update spring-webflux to new agent api

This commit is contained in:
Trask Stalnaker 2019-10-19 11:57:48 -07:00
parent 51fe9f48b9
commit f0c29adb61
10 changed files with 94 additions and 117 deletions

View File

@ -1,6 +1,5 @@
package datadog.trace.instrumentation.springwebflux.client;
import io.opentracing.util.GlobalTracer;
import net.bytebuddy.asm.Advice;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
@ -23,7 +22,7 @@ public class DefaultWebClientAdvice {
// is already decorated (we detect this if the "x-datadog-trace-id" is added), the result is not decorated again
// to avoid StackOverflowErrors.
&& !clientRequest.headers().keySet().contains("x-datadog-trace-id")) {
mono = new TracingClientResponseMono(clientRequest, exchangeFunction, GlobalTracer.get());
mono = new TracingClientResponseMono(clientRequest, exchangeFunction);
}
}
}

View File

@ -1,25 +1,14 @@
package datadog.trace.instrumentation.springwebflux.client;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
import datadog.trace.instrumentation.api.AgentPropagation;
import org.springframework.http.HttpHeaders;
public class HttpHeadersInjectAdapter implements TextMap {
public class HttpHeadersInjectAdapter implements AgentPropagation.Setter<HttpHeaders> {
private final HttpHeaders headers;
public HttpHeadersInjectAdapter(final HttpHeaders headers) {
this.headers = headers;
}
public static final HttpHeadersInjectAdapter SETTER = new HttpHeadersInjectAdapter();
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
@Override
public void put(final String key, final String value) {
headers.set(key, value);
public void set(final HttpHeaders carrier, final String key, final String value) {
carrier.set(key, value);
}
}

View File

@ -1,7 +1,7 @@
package datadog.trace.instrumentation.springwebflux.client;
import datadog.trace.agent.decorator.HttpClientDecorator;
import io.opentracing.Span;
import datadog.trace.instrumentation.api.AgentSpan;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.ClientRequest;
@ -10,10 +10,11 @@ import org.springframework.web.reactive.function.client.ClientResponse;
@Slf4j
public class SpringWebfluxHttpClientDecorator
extends HttpClientDecorator<ClientRequest, ClientResponse> {
public static final SpringWebfluxHttpClientDecorator DECORATE =
new SpringWebfluxHttpClientDecorator();
public void onCancel(final Span span) {
public void onCancel(final AgentSpan span) {
span.setTag("event", "cancelled");
span.setTag("message", "The subscription was cancelled");
}

View File

@ -1,12 +1,14 @@
package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.springwebflux.client.HttpHeadersInjectAdapter.SETTER;
import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.api.AgentTracer;
import io.opentracing.tag.Tags;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
@ -19,49 +21,41 @@ public class TracingClientResponseMono extends Mono<ClientResponse> {
private final ClientRequest clientRequest;
private final ExchangeFunction exchangeFunction;
private final Tracer tracer;
public TracingClientResponseMono(
final ClientRequest clientRequest,
final ExchangeFunction exchangeFunction,
final Tracer tracer) {
final ClientRequest clientRequest, final ExchangeFunction exchangeFunction) {
this.clientRequest = clientRequest;
this.exchangeFunction = exchangeFunction;
this.tracer = tracer;
}
@Override
public void subscribe(final CoreSubscriber<? super ClientResponse> subscriber) {
final Context context = subscriber.currentContext();
final Span parentSpan = context.<Span>getOrEmpty(Span.class).orElseGet(tracer::activeSpan);
final AgentSpan parentSpan =
context.<AgentSpan>getOrEmpty(AgentSpan.class).orElseGet(AgentTracer::activeSpan);
final Span span =
tracer
.buildSpan("http.request")
.asChildOf(parentSpan)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start();
final AgentSpan span;
if (parentSpan != null) {
span = startSpan("http.request", parentSpan.context());
} else {
span = startSpan("http.request");
}
span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT);
DECORATE.afterStart(span);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
try (final AgentScope scope = activateSpan(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
scope.setAsyncPropagation(true);
final ClientRequest mutatedRequest =
ClientRequest.from(clientRequest)
.headers(
httpHeaders ->
tracer.inject(
span.context(),
Format.Builtin.HTTP_HEADERS,
new HttpHeadersInjectAdapter(httpHeaders)))
.headers(httpHeaders -> propagate().inject(span, httpHeaders, SETTER))
.build();
exchangeFunction
.exchange(mutatedRequest)
.subscribe(
new TracingClientResponseSubscriber(subscriber, mutatedRequest, context, span, parentSpan));
new TracingClientResponseSubscriber(
subscriber, mutatedRequest, context, span, parentSpan));
}
}
}

View File

@ -1,13 +1,11 @@
package datadog.trace.instrumentation.springwebflux.client;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.noopSpan;
import static datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import org.springframework.web.reactive.function.client.ClientRequest;
@ -17,39 +15,36 @@ import reactor.util.context.Context;
public class TracingClientResponseSubscriber implements CoreSubscriber<ClientResponse> {
private final Tracer tracer = GlobalTracer.get();
private final CoreSubscriber<? super ClientResponse> subscriber;
private final ClientRequest clientRequest;
private final Context context;
private final AtomicReference<Span> spanRef;
private final Span parentSpan;
private final AtomicReference<AgentSpan> spanRef;
private final AgentSpan parentSpan;
public TracingClientResponseSubscriber(
final CoreSubscriber<? super ClientResponse> subscriber,
final ClientRequest clientRequest,
final Context context,
final Span span,
final Span parentSpan) {
final CoreSubscriber<? super ClientResponse> subscriber,
final ClientRequest clientRequest,
final Context context,
final AgentSpan span,
final AgentSpan parentSpan) {
this.subscriber = subscriber;
this.clientRequest = clientRequest;
this.context = context;
spanRef = new AtomicReference<>(span);
this.parentSpan = parentSpan == null ? NoopSpan.INSTANCE : parentSpan;
this.parentSpan = parentSpan == null ? noopSpan() : parentSpan;
}
@Override
public void onSubscribe(final Subscription subscription) {
final Span span = spanRef.get();
final AgentSpan span = spanRef.get();
if (span == null) {
subscriber.onSubscribe(subscription);
return;
}
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
try (final AgentScope scope = activateSpan(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
scope.setAsyncPropagation(true);
DECORATE.onRequest(span, clientRequest);
@ -57,7 +52,7 @@ public class TracingClientResponseSubscriber implements CoreSubscriber<ClientRes
new Subscription() {
@Override
public void request(final long n) {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
try (final AgentScope scope = activateSpan(span, false)) {
subscription.request(n);
}
}
@ -75,18 +70,16 @@ public class TracingClientResponseSubscriber implements CoreSubscriber<ClientRes
@Override
public void onNext(final ClientResponse clientResponse) {
final Span span = spanRef.getAndSet(null);
final AgentSpan span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.onResponse(span, clientResponse);
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
try (final AgentScope scope = activateSpan(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
scope.setAsyncPropagation(true);
subscriber.onNext(clientResponse);
}
@ -94,18 +87,16 @@ public class TracingClientResponseSubscriber implements CoreSubscriber<ClientRes
@Override
public void onError(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
final AgentSpan span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
try (final AgentScope scope = activateSpan(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
scope.setAsyncPropagation(true);
subscriber.onError(throwable);
}
@ -113,17 +104,15 @@ public class TracingClientResponseSubscriber implements CoreSubscriber<ClientRes
@Override
public void onComplete() {
final Span span = spanRef.getAndSet(null);
final AgentSpan span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.beforeFinish(span);
span.finish();
}
try (final Scope scope = tracer.scopeManager().activate(parentSpan, false)) {
try (final AgentScope scope = activateSpan(parentSpan, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
scope.setAsyncPropagation(true);
subscriber.onComplete();
}

View File

@ -2,8 +2,8 @@ package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils;
import io.opentracing.Span;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.server.ServerRequest;
@ -32,18 +32,18 @@ public class AdviceUtils {
public static void finishSpanIfPresent(
final ServerWebExchange exchange, final Throwable throwable) {
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable);
(AgentSpan) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable);
}
public static void finishSpanIfPresent(
final ServerRequest serverRequest, final Throwable throwable) {
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
(AgentSpan) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
public static void finishSpanIfPresent(
final ClientRequest clientRequest, final Throwable throwable) {
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) clientRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
(AgentSpan) clientRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
}

View File

@ -1,12 +1,13 @@
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.util.function.Function;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
@ -20,24 +21,27 @@ import reactor.core.publisher.Mono;
public class DispatcherHandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(@Advice.Argument(0) final ServerWebExchange exchange) {
public static AgentScope methodEnter(@Advice.Argument(0) final ServerWebExchange exchange) {
// Unfortunately Netty EventLoop is not instrumented well enough to attribute all work to the
// right things so we have to store span in request itself. We also store parent (netty's) span
// so we could update resource name.
final Span parentSpan = GlobalTracer.get().activeSpan();
final AgentSpan parentSpan = activeSpan();
if (parentSpan != null) {
exchange.getAttributes().put(AdviceUtils.PARENT_SPAN_ATTRIBUTE, parentSpan);
}
final Scope scope = GlobalTracer.get().buildSpan("DispatcherHandler.handle").startActive(false);
DECORATE.afterStart(scope);
((TraceScope) scope).setAsyncPropagation(true);
exchange.getAttributes().put(AdviceUtils.SPAN_ATTRIBUTE, scope.span());
final AgentSpan span = startSpan("DispatcherHandler.handle");
DECORATE.afterStart(span);
exchange.getAttributes().put(AdviceUtils.SPAN_ATTRIBUTE, span);
final AgentScope scope = activateSpan(span, false);
scope.setAsyncPropagation(true);
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter final Scope scope,
@Advice.Enter final AgentScope scope,
@Advice.Thrown final Throwable throwable,
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Return(readOnly = false) Mono<Object> mono) {

View File

@ -1,12 +1,11 @@
package datadog.trace.instrumentation.springwebflux.server;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerMapping;
@ -16,12 +15,12 @@ import org.springframework.web.util.pattern.PathPattern;
public class HandlerAdapterAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
public static AgentScope methodEnter(
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Argument(1) final Object handler) {
Scope scope = null;
final Span span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE);
AgentScope scope = null;
final AgentSpan span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE);
if (handler != null && span != null) {
final String handlerType;
final String operationName;
@ -36,14 +35,14 @@ public class HandlerAdapterAdvice {
handlerType = handler.getClass().getName();
}
span.setOperationName(operationName);
span.setSpanName(operationName);
span.setTag("handler.type", handlerType);
scope = GlobalTracer.get().scopeManager().activate(span, false);
((TraceScope) scope).setAsyncPropagation(true);
scope = activateSpan(span, false);
scope.setAsyncPropagation(true);
}
final Span parentSpan = exchange.getAttribute(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
final AgentSpan parentSpan = exchange.getAttribute(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
final PathPattern bestPattern =
exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (parentSpan != null && bestPattern != null) {
@ -58,7 +57,7 @@ public class HandlerAdapterAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Enter final Scope scope,
@Advice.Enter final AgentScope scope,
@Advice.Thrown final Throwable throwable) {
if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);

View File

@ -1,7 +1,7 @@
package datadog.trace.instrumentation.springwebflux.server;
import datadog.trace.api.DDTags;
import io.opentracing.Span;
import datadog.trace.instrumentation.api.AgentSpan;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.springframework.web.reactive.function.server.HandlerFunction;
@ -27,12 +27,13 @@ public class RouteOnSuccessOrError implements BiConsumer<HandlerFunction<?>, Thr
if (handler != null) {
final String predicateString = parsePredicateString();
if (predicateString != null) {
final Span span = (Span) serverRequest.attributes().get(AdviceUtils.SPAN_ATTRIBUTE);
final AgentSpan span =
(AgentSpan) serverRequest.attributes().get(AdviceUtils.SPAN_ATTRIBUTE);
if (span != null) {
span.setTag("request.predicate", predicateString);
}
final Span parentSpan =
(Span) serverRequest.attributes().get(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
final AgentSpan parentSpan =
(AgentSpan) serverRequest.attributes().get(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
if (parentSpan != null) {
parentSpan.setTag(DDTags.RESOURCE_NAME, parseResourceName(predicateString));
}

View File

@ -7,12 +7,13 @@ import datadog.trace.api.DDTags
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
import datadog.trace.instrumentation.springwebflux.client.SpringWebfluxHttpClientDecorator
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import org.springframework.http.HttpMethod
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient
import spock.lang.Shared
import static datadog.trace.instrumentation.api.AgentTracer.activeSpan
class SpringWebfluxHttpClientTest extends HttpClientTest<SpringWebfluxHttpClientDecorator> {
@Shared
@ -20,7 +21,7 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<SpringWebfluxHttpClient
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def hasParent = GlobalTracer.get().activeSpan() != null
def hasParent = activeSpan() != null
ClientResponse response = client.method(HttpMethod.resolve(method))
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
.uri(uri)