Merge pull request #688 from DataDog/mar-kolya/webflux-fixes

Fix webflux integration to not rely in active span
This commit is contained in:
Nikolay Martynov 2019-02-07 12:45:50 -05:00 committed by GitHub
commit 931e6ff8af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 618 additions and 640 deletions

View File

@ -195,8 +195,11 @@ public class AgentInstaller {
private static class ClassLoadListener implements AgentBuilder.Listener {
@Override
public void onDiscovery(
String typeName, ClassLoader classLoader, JavaModule javaModule, boolean b) {
for (Map.Entry<String, Runnable> entry : classLoadCallbacks.entrySet()) {
final String typeName,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b) {
for (final Map.Entry<String, Runnable> entry : classLoadCallbacks.entrySet()) {
if (entry.getKey().equals(typeName)) {
entry.getValue().run();
}
@ -205,25 +208,33 @@ public class AgentInstaller {
@Override
public void onTransformation(
TypeDescription typeDescription,
ClassLoader classLoader,
JavaModule javaModule,
boolean b,
DynamicType dynamicType) {}
final TypeDescription typeDescription,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b,
final DynamicType dynamicType) {}
@Override
public void onIgnored(
TypeDescription typeDescription,
ClassLoader classLoader,
JavaModule javaModule,
boolean b) {}
final TypeDescription typeDescription,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b) {}
@Override
public void onError(
String s, ClassLoader classLoader, JavaModule javaModule, boolean b, Throwable throwable) {}
final String s,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b,
final Throwable throwable) {}
@Override
public void onComplete(String s, ClassLoader classLoader, JavaModule javaModule, boolean b) {}
public void onComplete(
final String s,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b) {}
}
private AgentInstaller() {}

View File

@ -31,7 +31,9 @@ class SpringTemplateJMS1Test extends AgentTestRunner {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
template = new JmsTemplate(connectionFactory)
template.receiveTimeout = TimeUnit.SECONDS.toMillis(10)
// Make this longer than timeout on TEST_WRITER.waitForTraces
// Otherwise caller might give up waiting before callee has a chance to respond.
template.receiveTimeout = TimeUnit.SECONDS.toMillis(21)
}
def cleanupSpec() {

View File

@ -66,14 +66,14 @@ dependencies {
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile project(':dd-java-agent:instrumentation:netty-4.1')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.0.RELEASE'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '2.0.0.RELEASE'
testCompile group: 'org.spockframework', name: 'spock-spring', version: '1.1-groovy-2.4'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: '2.0.0.RELEASE'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-reactor-netty', version: '2.0.0.RELEASE'
testCompile group: 'org.spockframework', name: 'spock-spring', version: '1.1-groovy-2.4'
// FIXME: Tests need to be updated to support 2.1+
latestDepTestCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.+'

View File

@ -0,0 +1,22 @@
package datadog.trace.instrumentation.springwebflux;
import datadog.trace.agent.tooling.Instrumenter;
public abstract class AbstractWebfluxInstrumentation extends Instrumenter.Default {
public static final String PACKAGE = AbstractWebfluxInstrumentation.class.getPackage().getName();
public AbstractWebfluxInstrumentation(final String... additionalNames) {
super("spring-webflux", additionalNames);
}
@Override
public String[] helperClassNames() {
return new String[] {
PACKAGE + ".AdviceUtils",
PACKAGE + ".DispatcherHandlerOnSuccessOrError",
PACKAGE + ".DispatcherHandlerOnCancel",
PACKAGE + ".RouteOnSuccessOrError"
};
}
}

View File

@ -15,19 +15,7 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class DispatcherHandlerInstrumentation extends Instrumenter.Default {
public static final String PACKAGE =
DispatcherHandlerInstrumentation.class.getPackage().getName();
public DispatcherHandlerInstrumentation() {
super("spring-webflux");
}
@Override
public String[] helperClassNames() {
return new String[] {PACKAGE + ".DispatcherHandlerMonoBiConsumer"};
}
public final class DispatcherHandlerInstrumentation extends AbstractWebfluxInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {

View File

@ -1,9 +1,13 @@
package datadog.trace.instrumentation.springwebflux;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
@ -15,18 +19,13 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class HandlerFunctionAdapterInstrumentation extends Instrumenter.Default {
public static final String PACKAGE =
HandlerFunctionAdapterInstrumentation.class.getPackage().getName();
public HandlerFunctionAdapterInstrumentation() {
super("spring-webflux", "spring-webflux-functional");
}
public final class HandlerAdapterInstrumentation extends AbstractWebfluxInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter");
return not(isInterface())
.and(not(isAbstract()))
.and(safeHasSuperType(named("org.springframework.web.reactive.HandlerAdapter")));
}
@Override
@ -36,8 +35,9 @@ public final class HandlerFunctionAdapterInstrumentation extends Instrumenter.De
.and(isPublic())
.and(named("handle"))
.and(takesArgument(0, named("org.springframework.web.server.ServerWebExchange")))
.and(takesArgument(1, named("java.lang.Object")))
.and(takesArguments(2)),
// Cannot reference class directly here because it would lead to class load failure on Java7
PACKAGE + ".HandlerFunctionAdapterAdvice");
PACKAGE + ".HandlerAdapterAdvice");
}
}

View File

@ -1,53 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
// Provides a way to get the URL with path variables
@AutoService(Instrumenter.class)
public final class RequestMappingInfoHandlerMappingInstrumentation extends Instrumenter.Default {
public static final String PACKAGE =
RequestMappingInfoHandlerMappingInstrumentation.class.getPackage().getName();
public RequestMappingInfoHandlerMappingInstrumentation() {
super("spring-webflux", "spring-webflux-annotation");
}
@Override
public String[] helperClassNames() {
return new String[] {PACKAGE + ".DispatcherHandlerMonoBiConsumer"};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.web.reactive.result.method.RequestMappingInfoHandlerMapping");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isProtected())
.and(named("handleMatch"))
.and(
takesArgument(
0, named("org.springframework.web.reactive.result.method.RequestMappingInfo")))
.and(takesArgument(1, named("org.springframework.web.method.HandlerMethod")))
.and(takesArgument(2, named("org.springframework.web.server.ServerWebExchange")))
.and(takesArguments(3)),
// Cannot reference class directly here because it would lead to class load failure on Java7
PACKAGE + ".RequestMappingInfoHandlerMappingAdvice");
}
}

View File

@ -2,7 +2,6 @@ package datadog.trace.instrumentation.springwebflux;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.declaresField;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@ -19,27 +18,20 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class RouterFunctionInstrumentation extends Instrumenter.Default {
public static final String PACKAGE = RouterFunctionInstrumentation.class.getPackage().getName();
public final class RouterFunctionInstrumentation extends AbstractWebfluxInstrumentation {
public RouterFunctionInstrumentation() {
super("spring-webflux", "spring-webflux-functional");
}
@Override
public String[] helperClassNames() {
return new String[] {PACKAGE + ".DispatcherHandlerMonoBiConsumer"};
super("spring-webflux-functional");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isAbstract())
.and(declaresField(named("predicate")))
.and(
safeHasSuperType(
// TODO: this doesn't handle nested routes (DefaultNestedRouterFunction)
named(
"org.springframework.web.reactive.function.server.RouterFunctions$AbstractRouterFunction")));
"org.springframework.web.reactive.function.server.RouterFunctions$DefaultRouterFunction")));
}
@Override

View File

@ -0,0 +1,65 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
public class AdviceUtils {
public static final String SPAN_ATTRIBUTE = "datadog.trace.instrumentation.springwebflux.Span";
public static final String PARENT_SPAN_ATTRIBUTE =
"datadog.trace.instrumentation.springwebflux.ParentSpan";
public static String parseOperationName(final Object handler) {
final String className = parseClassName(handler.getClass());
final String operationName;
final int lambdaIdx = className.indexOf("$$Lambda$");
if (lambdaIdx > -1) {
operationName = className.substring(0, lambdaIdx) + ".lambda";
} else {
operationName = className + ".handle";
}
return operationName;
}
public static String parseClassName(final Class clazz) {
String className = clazz.getSimpleName();
if (className.isEmpty()) {
className = clazz.getName();
if (clazz.getPackage() != null) {
final String pkgName = clazz.getPackage().getName();
if (!pkgName.isEmpty()) {
className = clazz.getName().replace(pkgName, "").substring(1);
}
}
}
return className;
}
public static void finishSpanIfPresent(
final ServerWebExchange exchange, final Throwable throwable) {
// Span could have been removed and finished by other thread before we got here
finishSpanIfPresent((Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable);
}
public static void finishSpanIfPresent(
final ServerRequest serverRequest, final Throwable throwable) {
// Span could have been removed and finished by other thread before we got here
finishSpanIfPresent((Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
private static void finishSpanIfPresent(final Span span, final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
span.finish();
}
}
}

View File

@ -1,37 +1,58 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import net.bytebuddy.asm.Advice;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* This is 'top level' advice for Webflux instrumentation. This handles creating and finishing
* Webflux span.
*/
public class DispatcherHandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan() {
return GlobalTracer.get()
public static Scope methodEnter(@Advice.Argument(0) final ServerWebExchange exchange) {
// Unfortunately Netty EventLoop is not instrumented well enough to attribute all work to the
// right things so we have to store span in request itself. We also store parent (netty's) span
// so we could update resource name.
final Span parentSpan = GlobalTracer.get().activeSpan();
if (parentSpan != null) {
exchange.getAttributes().put(AdviceUtils.PARENT_SPAN_ATTRIBUTE, parentSpan);
}
final Scope scope =
GlobalTracer.get()
.buildSpan("DispatcherHandler.handle")
.withTag(Tags.COMPONENT.getKey(), "spring-webflux-controller")
.startActive(true);
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_SERVER)
.startActive(false);
((TraceScope) scope).setAsyncPropagation(true);
exchange.getAttributes().put(AdviceUtils.SPAN_ATTRIBUTE, scope.span());
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addBehaviorTrigger(
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable,
@Advice.Return(readOnly = false) Mono<Void> returnMono) {
public static void methodExit(
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable,
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Return(readOnly = false) Mono<?> returnMono) {
if (throwable == null && returnMono != null) {
returnMono =
returnMono
.doOnSuccessOrError(new DispatcherHandlerOnSuccessOrError<>(exchange))
.doOnCancel(new DispatcherHandlerOnCancel(exchange));
} else if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}
if (scope != null) {
if (throwable != null) {
Tags.ERROR.set(scope.span(), true);
scope.span().log(Collections.singletonMap(ERROR_OBJECT, throwable));
} else {
returnMono = returnMono.doOnSuccessOrError(new DispatcherHandlerMonoBiConsumer<>(scope));
}
scope.close();
}
}
}

View File

@ -1,46 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.function.BiConsumer;
public class DispatcherHandlerMonoBiConsumer<U> implements BiConsumer<U, Throwable> {
private final Scope scope;
public static final ThreadLocal<String> tlsPathUrlTag = new ThreadLocal<>();
public DispatcherHandlerMonoBiConsumer(final Scope scope) {
this.scope = scope;
}
@Override
public void accept(final U object, final Throwable throwable) {
final Span spanToChange = scope.span();
if (throwable != null) {
spanToChange.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.ERROR.set(spanToChange, true);
}
scope.close();
final Span parentSpan = GlobalTracer.get().activeSpan();
final String pathUrl = tlsPathUrlTag.get();
if (pathUrl != null && parentSpan != null) {
parentSpan.setTag(DDTags.RESOURCE_NAME, pathUrl);
parentSpan.setTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET);
tlsPathUrlTag.remove();
}
}
public static void setTLPathUrl(final String pathUrl) {
tlsPathUrlTag.set(pathUrl);
}
}

View File

@ -0,0 +1,18 @@
package datadog.trace.instrumentation.springwebflux;
import org.springframework.web.server.ServerWebExchange;
public class DispatcherHandlerOnCancel implements Runnable {
private final ServerWebExchange exchange;
public DispatcherHandlerOnCancel(final ServerWebExchange exchange) {
this.exchange = exchange;
}
@Override
public void run() {
// Make sure we are not leaking opened spans for canceled Monos.
AdviceUtils.finishSpanIfPresent(exchange, null);
}
}

View File

@ -0,0 +1,21 @@
package datadog.trace.instrumentation.springwebflux;
import java.util.function.BiConsumer;
import org.springframework.web.server.ServerWebExchange;
public class DispatcherHandlerOnSuccessOrError<U> implements BiConsumer<U, Throwable> {
private final ServerWebExchange exchange;
public DispatcherHandlerOnSuccessOrError(final ServerWebExchange exchange) {
this.exchange = exchange;
}
@Override
public void accept(final U object, final Throwable throwable) {
// Closing span here means it closes after Netty span which may not be ideal.
// We could instrument HandlerFunctionAdapter instead, but this would mean we
// would not account for time spent sending request.
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}
}

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.springwebflux;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import net.bytebuddy.asm.Advice;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.pattern.PathPattern;
public class HandlerAdapterAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Argument(1) final Object handler) {
Scope scope = null;
final Span span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE);
if (handler != null && span != null) {
final String handlerType;
final String operationName;
if (handler instanceof HandlerMethod) {
// Special case for requests mapped with annotations
final HandlerMethod handlerMethod = (HandlerMethod) handler;
final Class handlerClass = handlerMethod.getMethod().getDeclaringClass();
operationName =
AdviceUtils.parseClassName(handlerClass) + "." + handlerMethod.getMethod().getName();
handlerType = handlerClass.getName();
} else {
operationName = AdviceUtils.parseOperationName(handler);
handlerType = handler.getClass().getName();
}
span.setOperationName(operationName);
span.setTag("handler.type", handlerType);
scope = GlobalTracer.get().scopeManager().activate(span, false);
((TraceScope) scope).setAsyncPropagation(true);
}
final Span parentSpan = exchange.getAttribute(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
final PathPattern bestPattern =
exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (parentSpan != null && bestPattern != null) {
parentSpan.setTag(
DDTags.RESOURCE_NAME,
exchange.getRequest().getMethodValue() + " " + bestPattern.getPatternString());
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable) {
if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}
if (scope != null) {
scope.close();
}
}
}

View File

@ -1,52 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import net.bytebuddy.asm.Advice;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.server.HandlerFunction;
public class HandlerFunctionAdapterAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void recordHandlerFunctionTag(
@Advice.Thrown final Throwable throwable, @Advice.Argument(1) final Object handler) {
final Span activeSpan = GlobalTracer.get().activeSpan();
if (activeSpan != null && handler != null) {
final Class clazz = handler.getClass();
String className = clazz.getSimpleName();
if (className.isEmpty()) {
className = clazz.getName();
if (clazz.getPackage() != null) {
final String pkgName = clazz.getPackage().getName();
if (!pkgName.isEmpty()) {
className = clazz.getName().replace(pkgName, "").substring(1);
}
}
}
LoggerFactory.getLogger(HandlerFunction.class).warn(className);
final String operationName;
final int lambdaIdx = className.indexOf("$$Lambda$");
if (lambdaIdx > -1) {
operationName = className.substring(0, lambdaIdx) + ".lambda";
} else {
operationName = className + ".handle";
}
activeSpan.setOperationName(operationName);
activeSpan.setTag("handler.type", clazz.getName());
if (throwable != null) {
Tags.ERROR.set(activeSpan, true);
activeSpan.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
}
}
}

View File

@ -1,64 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Collections;
import net.bytebuddy.asm.Advice;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.pattern.PathPattern;
public class RequestMappingInfoHandlerMappingAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void nameSpan(
@Advice.Argument(1) final HandlerMethod handlerMethod,
@Advice.Argument(2) final ServerWebExchange serverWebExchange,
@Advice.Thrown final Throwable throwable) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope != null) {
final Span span = scope.span();
final PathPattern bestPattern =
serverWebExchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (bestPattern != null) {
DispatcherHandlerMonoBiConsumer.setTLPathUrl(
serverWebExchange.getRequest().getMethodValue() + " " + bestPattern.getPatternString());
}
if (handlerMethod != null) {
final Method method;
final Class clazz;
final String methodName;
clazz = handlerMethod.getMethod().getDeclaringClass();
method = handlerMethod.getMethod();
methodName = method.getName();
span.setTag("handler.type", clazz.getName());
String className = clazz.getSimpleName();
if (className.isEmpty()) {
className = clazz.getName();
if (clazz.getPackage() != null) {
final String pkgName = clazz.getPackage().getName();
if (!pkgName.isEmpty()) {
className = clazz.getName().replace(pkgName, "").substring(1);
}
}
}
span.setOperationName(className + "." + methodName);
}
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
}
}
}

View File

@ -0,0 +1,61 @@
package datadog.trace.instrumentation.springwebflux;
import datadog.trace.api.DDTags;
import io.opentracing.Span;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
public class RouteOnSuccessOrError implements BiConsumer<HandlerFunction<?>, Throwable> {
private static final Pattern SPECIAL_CHARACTERS_REGEX = Pattern.compile("[\\(\\)&|]");
private static final Pattern SPACES_REGEX = Pattern.compile("[ \\t]+");
private final RouterFunction routerFunction;
private final ServerRequest serverRequest;
public RouteOnSuccessOrError(
final RouterFunction routerFunction, final ServerRequest serverRequest) {
this.routerFunction = routerFunction;
this.serverRequest = serverRequest;
}
@Override
public void accept(final HandlerFunction<?> handler, final Throwable throwable) {
if (handler != null) {
final String predicateString = parsePredicateString();
if (predicateString != null) {
final Span span = (Span) serverRequest.attributes().get(AdviceUtils.SPAN_ATTRIBUTE);
if (span != null) {
span.setTag("request.predicate", predicateString);
}
final Span parentSpan =
(Span) serverRequest.attributes().get(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
if (parentSpan != null) {
parentSpan.setTag(DDTags.RESOURCE_NAME, parseResourceName(predicateString));
}
}
}
}
private String parsePredicateString() {
final String routerFunctionString = routerFunction.toString();
// Router functions containing lambda predicates should not end up in span tags since they are
// confusing
if (routerFunctionString.startsWith(
"org.springframework.web.reactive.function.server.RequestPredicates$$Lambda$")) {
return null;
} else {
return routerFunctionString.replaceFirst("\\s*->.*$", "");
}
}
private String parseResourceName(final String routerString) {
return SPACES_REGEX
.matcher(SPECIAL_CHARACTERS_REGEX.matcher(routerString).replaceAll(""))
.replaceAll(" ")
.trim();
}
}

View File

@ -1,65 +1,27 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import net.bytebuddy.asm.Advice;
import org.springframework.web.reactive.function.server.RequestPredicate;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;
/**
* This advice is responsible for setting additional span parameters for routes implemented with
* functional interface.
*/
public class RouterFunctionAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void nameResource(
@Advice.FieldValue(value = "predicate") final RequestPredicate predicate,
@Advice.Argument(0) final ServerRequest serverRequest) {
if (predicate == null) {
return;
}
final Class predicateEnclosingClass = predicate.getClass().getEnclosingClass();
final String predicateString = predicate.toString();
if (predicate.test(serverRequest)
&& serverRequest != null
&& predicateString != null
&& !predicateString.isEmpty()
&& predicateEnclosingClass == RequestPredicates.class) {
// only change parent span if the predicate is one of those enclosed in
// org.springframework.web.reactive.function.server RequestPredicates
// otherwise the parent may have weird resource names such as lambda request predicate class
// names that arise from webflux error handling
final String resourceName =
predicateString.replaceAll("[\\(\\)&|]", "").replaceAll("[ \\t]+", " ");
// to be used as resource name by netty span, most likely
DispatcherHandlerMonoBiConsumer.setTLPathUrl(resourceName);
// should be the dispatcher handler span
final Scope activeScope = GlobalTracer.get().scopeManager().active();
if (activeScope != null) {
activeScope.span().setTag("request.predicate", predicateString);
activeScope.span().setTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_SERVER);
}
}
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void recordThrowable(@Advice.Thrown final Throwable throwable) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope != null && throwable != null) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
public static void methodExit(
@Advice.This final RouterFunction thiz,
@Advice.Argument(0) final ServerRequest serverRequest,
@Advice.Return(readOnly = false) Mono<HandlerFunction<?>> result,
@Advice.Thrown final Throwable throwable) {
if (throwable == null) {
result = result.doOnSuccessOrError(new RouteOnSuccessOrError(thiz, serverRequest));
} else {
AdviceUtils.finishSpanIfPresent(serverRequest, throwable);
}
}
}

View File

@ -0,0 +1,28 @@
import dd.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer
import org.springframework.context.annotation.Bean
import reactor.ipc.netty.resources.LoopResources
/**
* Run all Webflux tests under netty event loop having only 1 thread.
* Some of the bugs are better visible in this setup because same thread is reused
* for different requests.
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [SpringWebFluxTestApplication, ForceSingleThreadedNettyAutoConfiguration])
class SingleThreadedSpringWebfluxTest extends SpringWebfluxTest {
@TestConfiguration
static class ForceSingleThreadedNettyAutoConfiguration {
@Bean
NettyReactiveWebServerFactory nettyFactory() {
def factory = new NettyReactiveWebServerFactory()
NettyServerCustomizer customizer = { builder -> builder.loopResources(LoopResources.create("my-http", 1, true)) }
factory.addServerCustomizers(customizer)
return factory
}
}
}

View File

@ -2,21 +2,33 @@ import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.OkHttpUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import datadog.trace.instrumentation.springwebflux.EchoHandlerFunction
import datadog.trace.instrumentation.springwebflux.FooModel
import datadog.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import datadog.trace.instrumentation.springwebflux.TestController
import dd.trace.instrumentation.springwebflux.EchoHandlerFunction
import dd.trace.instrumentation.springwebflux.FooModel
import dd.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import dd.trace.instrumentation.springwebflux.TestController
import io.opentracing.tag.Tags
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.boot.web.server.LocalServerPort
import org.springframework.context.annotation.Bean
import org.springframework.web.server.ResponseStatusException
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = SpringWebFluxTestApplication)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [SpringWebFluxTestApplication, ForceNettyAutoConfiguration])
class SpringWebfluxTest extends AgentTestRunner {
@TestConfiguration
static class ForceNettyAutoConfiguration {
@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}
static final okhttp3.MediaType PLAIN_TYPE = okhttp3.MediaType.parse("text/plain; charset=utf-8")
static final String INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX = SpringWebFluxTestApplication.getName() + "\$"
static final String SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX = SpringWebFluxTestApplication.getSimpleName() + "\$"
@ -26,9 +38,9 @@ class SpringWebfluxTest extends AgentTestRunner {
OkHttpClient client = OkHttpUtils.client()
def "Basic GET test #testName to functional API"() {
def "Basic GET test #testName"() {
setup:
String url = "http://localhost:$port/greet$urlSuffix"
String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
@ -39,22 +51,35 @@ class SpringWebfluxTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
if (annotatedMethod == null) {
// Functional API
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"request.predicate" "(GET && /greet$pathVariableUrlSuffix)"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
}
} else {
// Annotation API
"handler.type" TestController.getName()
}
defaultTags()
}
}
span(1) {
resourceName "GET /greet$pathVariableUrlSuffix"
resourceName "GET $urlPathWithVariables"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
@ -74,61 +99,16 @@ class SpringWebfluxTest extends AgentTestRunner {
}
where:
testName | urlSuffix | pathVariableUrlSuffix | expectedResponseBody
"without paramaters" | "" | "" | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"with one parameter" | "/WORLD" | "/{name}" | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " WORLD"
"with two parameters" | "/World/Test1" | "/{name}/{word}" | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " World Test1"
}
testName | urlPath | urlPathWithVariables | annotatedMethod | expectedResponseBody
"functional API without parameters" | "/greet" | "/greet" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"functional API with one parameter" | "/greet/WORLD" | "/greet/{name}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " WORLD"
"functional API with two parameters" | "/greet/World/Test1" | "/greet/{name}/{word}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " World Test1"
"functional API delayed response" | "/greet-delayed" | "/greet-delayed" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
def "Basic GET test #testName to annotations API"() {
setup:
String url = "http://localhost:$port/foo$urlSuffix"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
then:
response.code == 200
response.body().string() == expectedResponseBody
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName TestController.getSimpleName() + ".getFooModel"
operationName TestController.getSimpleName() + ".getFooModel"
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" TestController.getName()
defaultTags()
}
}
span(1) {
resourceName "GET /foo$pathVariableUrlSuffix"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
}
}
}
}
where:
testName | urlSuffix | pathVariableUrlSuffix | expectedResponseBody
"without parameters" | "" | "" | new FooModel(0L, "DEFAULT").toString()
"with one parameter" | "/1" | "/{id}" | new FooModel(1, "pass").toString()
"with two parameters" | "/2/world" | "/{id}/{name}" | new FooModel(2, "world").toString()
"annotation API without parameters" | "/foo" | "/foo" | "getFooModel" | new FooModel(0L, "DEFAULT").toString()
"annotation API with one parameter" | "/foo/1" | "/foo/{id}" | "getFooModel" | new FooModel(1L, "pass").toString()
"annotation API with two parameters" | "/foo/2/world" | "/foo/{id}/{name}" | "getFooModel" | new FooModel(2L, "world").toString()
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
}
def "404 GET test"() {
@ -161,14 +141,15 @@ class SpringWebfluxTest extends AgentTestRunner {
}
}
span(1) {
resourceName "DispatcherHandler.handle"
operationName "DispatcherHandler.handle"
resourceName "ResourceWebHandler.handle"
operationName "ResourceWebHandler.handle"
spanType DDSpanTypes.HTTP_SERVER
childOf(span(0))
errored true
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" "org.springframework.web.reactive.resource.ResourceWebHandler"
errorTags(ResponseStatusException, String)
defaultTags()
}
@ -191,7 +172,7 @@ class SpringWebfluxTest extends AgentTestRunner {
response.code() == 202
response.body().string() == echoString
assertTraces(1) {
trace(0, 2) {
trace(0, 3) {
span(0) {
resourceName EchoHandlerFunction.getSimpleName() + ".handle"
operationName EchoHandlerFunction.getSimpleName() + ".handle"
@ -224,13 +205,23 @@ class SpringWebfluxTest extends AgentTestRunner {
defaultTags()
}
}
span(2) {
resourceName "echo"
operationName "echo"
childOf(span(0))
tags {
"$Tags.COMPONENT.key" "trace"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
defaultTags()
}
}
}
}
}
def "GET to bad annotation API endpoint"() {
def "GET to bad endpoint #testName"() {
setup:
String url = "http://localhost:$port/failfoo/1"
String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build()
when:
@ -241,7 +232,7 @@ class SpringWebfluxTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName "GET /failfoo/{id}"
resourceName "GET $urlPathWithVariables"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
errored true
@ -260,80 +251,51 @@ class SpringWebfluxTest extends AgentTestRunner {
}
}
span(1) {
resourceName "TestController.getFooModelFail"
operationName "TestController.getFooModelFail"
if (annotatedMethod == null) {
// Functional API
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER
childOf(span(0))
errored true
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" TestController.getName()
errorTags(ArithmeticException, String)
defaultTags()
}
}
}
}
}
def "POST to bad functional API endpoint"() {
setup:
String echoString = "TEST"
RequestBody body = RequestBody.create(PLAIN_TYPE, echoString)
String url = "http://localhost:$port/fail-echo"
def request = new Request.Builder().url(url).post(body).build()
when:
def response = client.newCall(request).execute()
then:
response.code() == 500
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName "POST /fail-echo"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
errored true
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "POST"
"$Tags.HTTP_STATUS.key" 500
"$Tags.HTTP_URL.key" url
"error" true
defaultTags()
}
}
span(1) {
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
spanType DDSpanTypes.HTTP_SERVER
childOf(span(0))
errored true
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"request.predicate" "(POST && /fail-echo)"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
}
} else {
// Annotation API
"handler.type" TestController.getName()
}
errorTags(NullPointerException, String)
errorTags(RuntimeException, "bad things happen")
defaultTags()
}
}
}
}
where:
testName | urlPath | urlPathWithVariables | annotatedMethod
"functional API fail fast" | "/greet-failfast/1" | "/greet-failfast/{id}" | null
"functional API fail Mono" | "/greet-failmono/1" | "/greet-failmono/{id}" | null
"annotation API fail fast" | "/foo-failfast/1" | "/foo-failfast/{id}" | "getFooFailFast"
"annotation API fail Mono" | "/foo-failmono/1" | "/foo-failmono/{id}" | "getFooFailMono"
}
def "Redirect test"() {
setup:
String url = "http://localhost:$port/double-greet-redirect"
String finalUrl = "http://localhost:$port/double-greet"
def request = new Request.Builder().url(url).get().build()
when:
@ -342,6 +304,7 @@ class SpringWebfluxTest extends AgentTestRunner {
then:
response.code == 200
assertTraces(2) {
// TODO: why order of spans is different in these traces?
trace(0, 2) {
span(0) {
resourceName "GET /double-greet-redirect"
@ -371,7 +334,7 @@ class SpringWebfluxTest extends AgentTestRunner {
"request.predicate" "(GET && /double-greet-redirect)"
"handler.type" { String tagVal ->
return (tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
|| tagVal.contains("Lambda")) // || tagVal.contains("Proxy"))
|| tagVal.contains("Lambda"))
}
defaultTags()
}
@ -406,7 +369,7 @@ class SpringWebfluxTest extends AgentTestRunner {
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$port/double-greet"
"$Tags.HTTP_URL.key" finalUrl
defaultTags()
}
}
@ -414,106 +377,72 @@ class SpringWebfluxTest extends AgentTestRunner {
}
}
def "Flux x#count GET test with functional API endpoint"() {
def "Multiple GETs to delaying route #testName"() {
setup:
String expectedResponseBodyStr = FooModel.createXFooModelsStringFromArray(FooModel.createXFooModels(count))
String url = "http://localhost:$port/greet-counter/$count"
def requestsCount = 50 // Should be more than 2x CPUs to fish out some bugs
String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
def responses = (0..requestsCount - 1).collect { client.newCall(request).execute() }
then:
response.code() == 200
expectedResponseBodyStr == response.body().string()
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"request.predicate" "(GET && /greet-counter/{count})"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
responses.every { it.code == 200 }
responses.every { it.body().string() == expectedResponseBody }
assertTraces(responses.size()) {
responses.eachWithIndex { def response, int i ->
trace(i, 2) {
span(0) {
if (annotatedMethod == null) {
// Functional API
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
}
} else {
// Annotation API
"handler.type" TestController.getName()
}
defaultTags()
}
defaultTags()
}
}
span(1) {
resourceName "GET /greet-counter/{count}"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
span(1) {
resourceName "GET $urlPathWithVariables"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
}
}
}
}
}
where:
count << [0, 1, 10]
}
def "Flux x#count GET test with spring annotations endpoint"() {
setup:
String expectedResponseBodyStr = FooModel.createXFooModelsStringFromArray(FooModel.createXFooModels(count))
String url = "http://localhost:$port/annotation-foos/$count"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
then:
response.code() == 200
expectedResponseBodyStr == response.body().string()
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName TestController.getSimpleName() + ".getXFooModels"
operationName TestController.getSimpleName() + ".getXFooModels"
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" TestController.getName()
defaultTags()
}
}
span(1) {
resourceName "GET /annotation-foos/{count}"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
}
}
}
}
where:
count << [0, 1, 10]
testName | urlPath | urlPathWithVariables | annotatedMethod | expectedResponseBody
"functional API delayed response" | "/greet-delayed" | "/greet-delayed" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
}
}

View File

@ -1,37 +0,0 @@
package datadog.trace.instrumentation.springwebflux
class FooModel {
public long id
public String name
FooModel(long id, String name) {
this.id = id
this.name = name
}
@Override
String toString() {
return "{\"id\":" + id + ",\"name\":\"" + name + "\"}"
}
static FooModel[] createXFooModels(long count) {
FooModel[] foos = new FooModel[count]
for (int i = 0; i < count; ++i) {
foos[i] = new FooModel(i, String.valueOf(i))
}
return foos
}
static String createXFooModelsStringFromArray(FooModel[] foos) {
StringBuilder sb = new StringBuilder()
sb.append("[")
for (int i = 0; i < foos.length; ++i) {
sb.append(foos[i].toString())
if (i < foos.length - 1) {
sb.append(",")
}
}
sb.append("]")
return sb.toString()
}
}

View File

@ -1,13 +1,19 @@
package datadog.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux
import datadog.trace.api.Trace
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono
/**
* Note: this class has to stay outside of 'datadog.*' package because we need
* it transformed by {@code @Trace} annotation.
*/
@Component
class EchoHandler {
@Trace(operationName = "echo")
Mono<ServerResponse> echo(ServerRequest request) {
return ServerResponse.accepted().contentType(MediaType.TEXT_PLAIN)
.body(request.bodyToMono(String), String)

View File

@ -1,4 +1,5 @@
package datadog.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux
import org.springframework.web.reactive.function.server.HandlerFunction
import org.springframework.web.reactive.function.server.ServerRequest

View File

@ -0,0 +1,16 @@
package dd.trace.instrumentation.springwebflux
class FooModel {
public long id
public String name
FooModel(long id, String name) {
this.id = id
this.name = name
}
@Override
String toString() {
return "{\"id\":" + id + ",\"name\":\"" + name + "\"}"
}
}

View File

@ -1,12 +1,6 @@
package datadog.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux
import org.springframework.boot.SpringApplication
import org.springframework.boot.WebApplicationType
import org.springframework.boot.autoconfigure.AutoConfigureBefore
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.web.reactive.ReactiveWebServerFactoryAutoConfiguration
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.context.annotation.Bean
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
@ -18,6 +12,8 @@ import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import static org.springframework.web.reactive.function.server.RequestPredicates.GET
import static org.springframework.web.reactive.function.server.RequestPredicates.POST
import static org.springframework.web.reactive.function.server.RouterFunctions.route
@ -25,31 +21,9 @@ import static org.springframework.web.reactive.function.server.RouterFunctions.r
@SpringBootApplication
class SpringWebFluxTestApplication {
static void main(String[] args) {
SpringApplication app = new SpringApplication(SpringWebFluxTestApplication)
app.setWebApplicationType(WebApplicationType.REACTIVE)
app.run(args)
}
@TestConfiguration
@AutoConfigureBefore(ReactiveWebServerFactoryAutoConfiguration)
class ForceNettyAutoConfiguration {
@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}
@Bean
RouterFunction<ServerResponse> echoRouterFunction(EchoHandler echoHandler) {
return route(POST("/echo"), new EchoHandlerFunction(echoHandler))
.andRoute(POST("/fail-echo"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return echoHandler.echo(null)
}
})
}
@Bean
@ -74,10 +48,20 @@ class SpringWebFluxTestApplication {
Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.doubleGreet()
}
}).andRoute(GET("/greet-counter/{count}"), new HandlerFunction<ServerResponse>() {
}).andRoute(GET("/greet-delayed"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.counterGreet(request)
return greetingHandler.defaultGreet().delayElement(Duration.ofMillis(100))
}
}).andRoute(GET("/greet-failfast/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
throw new RuntimeException("bad things happen")
}
}).andRoute(GET("/greet-failmono/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return Mono.error(new RuntimeException("bad things happen"))
}
})
}

View File

@ -1,19 +1,15 @@
package datadog.trace.instrumentation.springwebflux
package dd.trace.instrumentation.springwebflux
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
@RestController
class TestController {
@GetMapping("/failfoo/{id}")
Mono<FooModel> getFooModelFail(@PathVariable("id") long id) {
return Mono.just(new FooModel((id / 0), "fail"))
}
@GetMapping("/foo")
Mono<FooModel> getFooModel() {
return Mono.just(new FooModel(0L, "DEFAULT"))
@ -29,9 +25,18 @@ class TestController {
return Mono.just(new FooModel(id, name))
}
@GetMapping(value = "/annotation-foos/{count}")
Flux<FooModel> getXFooModels(@PathVariable("count") long count) {
FooModel[] foos = FooModel.createXFooModels(count)
return Flux.just(foos)
@GetMapping("/foo-delayed")
Mono<FooModel> getFooDelayed() {
return Mono.just(new FooModel(3L, "delayed")).delayElement(Duration.ofMillis(100))
}
@GetMapping("/foo-failfast/{id}")
Mono<FooModel> getFooFailFast(@PathVariable("id") long id) {
throw new RuntimeException("bad things happen")
}
@GetMapping("/foo-failmono/{id}")
Mono<FooModel> getFooFailMono(@PathVariable("id") long id) {
return Mono.error(new RuntimeException("bad things happen"))
}
}

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux;
package dd.trace.instrumentation.springwebflux;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
// Need to keep this in Java because groovy creates crazy proxies around lambdas
@Component
public class RedirectComponent {
@Bean

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<root level="WARN">
<appender-ref ref="console"/>
</root>
<logger name="datadog" level="debug"/>
</configuration>

View File

@ -77,6 +77,11 @@ public class ContinuableScope implements Scope, TraceScope {
listener.afterScopeActivated();
}
}
} else {
log.debug(
"Tried to close {} scope when {} is on top. Ignoring!",
this,
scopeManager.tlsScope.get());
}
}
@ -130,7 +135,10 @@ public class ContinuableScope implements Scope, TraceScope {
@Override
public ContinuableScope activate() {
if (used.compareAndSet(false, true)) {
return new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose);
final ContinuableScope scope =
new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose);
log.debug("Activating continuation {}, scope: {}", this, scope);
return scope;
} else {
log.debug(
"Failed to activate continuation. Reusing a continuation not allowed. Returning a new scope. Spans will not be linked.");