Fix webflux integration to not rely in active span

Netty event loop instrumentation doesn't allow us to attribute work to
correct span so we have to maintain that manually.
This commit is contained in:
Nikolay Martynov 2019-02-01 14:34:06 -05:00
parent f84e510f3e
commit a086f38f2f
27 changed files with 606 additions and 638 deletions

View File

@ -195,8 +195,11 @@ public class AgentInstaller {
private static class ClassLoadListener implements AgentBuilder.Listener { private static class ClassLoadListener implements AgentBuilder.Listener {
@Override @Override
public void onDiscovery( public void onDiscovery(
String typeName, ClassLoader classLoader, JavaModule javaModule, boolean b) { final String typeName,
for (Map.Entry<String, Runnable> entry : classLoadCallbacks.entrySet()) { final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b) {
for (final Map.Entry<String, Runnable> entry : classLoadCallbacks.entrySet()) {
if (entry.getKey().equals(typeName)) { if (entry.getKey().equals(typeName)) {
entry.getValue().run(); entry.getValue().run();
} }
@ -205,25 +208,33 @@ public class AgentInstaller {
@Override @Override
public void onTransformation( public void onTransformation(
TypeDescription typeDescription, final TypeDescription typeDescription,
ClassLoader classLoader, final ClassLoader classLoader,
JavaModule javaModule, final JavaModule javaModule,
boolean b, final boolean b,
DynamicType dynamicType) {} final DynamicType dynamicType) {}
@Override @Override
public void onIgnored( public void onIgnored(
TypeDescription typeDescription, final TypeDescription typeDescription,
ClassLoader classLoader, final ClassLoader classLoader,
JavaModule javaModule, final JavaModule javaModule,
boolean b) {} final boolean b) {}
@Override @Override
public void onError( public void onError(
String s, ClassLoader classLoader, JavaModule javaModule, boolean b, Throwable throwable) {} final String s,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b,
final Throwable throwable) {}
@Override @Override
public void onComplete(String s, ClassLoader classLoader, JavaModule javaModule, boolean b) {} public void onComplete(
final String s,
final ClassLoader classLoader,
final JavaModule javaModule,
final boolean b) {}
} }
private AgentInstaller() {} private AgentInstaller() {}

View File

@ -66,14 +66,14 @@ dependencies {
implementation deps.autoservice implementation deps.autoservice
testCompile project(':dd-java-agent:testing') testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile project(':dd-java-agent:instrumentation:netty-4.1') testCompile project(':dd-java-agent:instrumentation:netty-4.1')
testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.0.RELEASE' testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.0.RELEASE'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '2.0.0.RELEASE'
testCompile group: 'org.spockframework', name: 'spock-spring', version: '1.1-groovy-2.4'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: '2.0.0.RELEASE' testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: '2.0.0.RELEASE'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-reactor-netty', version: '2.0.0.RELEASE' testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-reactor-netty', version: '2.0.0.RELEASE'
testCompile group: 'org.spockframework', name: 'spock-spring', version: '1.1-groovy-2.4'
// FIXME: Tests need to be updated to support 2.1+ // FIXME: Tests need to be updated to support 2.1+
latestDepTestCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.+' latestDepTestCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.+'

View File

@ -0,0 +1,22 @@
package datadog.trace.instrumentation.springwebflux;
import datadog.trace.agent.tooling.Instrumenter;
public abstract class AbstractWebfluxInstrumentation extends Instrumenter.Default {
public static final String PACKAGE = AbstractWebfluxInstrumentation.class.getPackage().getName();
public AbstractWebfluxInstrumentation(final String... additionalNames) {
super("spring-webflux", additionalNames);
}
@Override
public String[] helperClassNames() {
return new String[] {
PACKAGE + ".AdviceUtils",
PACKAGE + ".DispatcherHandlerOnSuccessOrError",
PACKAGE + ".DispatcherHandlerOnCancel",
PACKAGE + ".RouteOnSuccessOrError"
};
}
}

View File

@ -15,19 +15,7 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public final class DispatcherHandlerInstrumentation extends Instrumenter.Default { public final class DispatcherHandlerInstrumentation extends AbstractWebfluxInstrumentation {
public static final String PACKAGE =
DispatcherHandlerInstrumentation.class.getPackage().getName();
public DispatcherHandlerInstrumentation() {
super("spring-webflux");
}
@Override
public String[] helperClassNames() {
return new String[] {PACKAGE + ".DispatcherHandlerMonoBiConsumer"};
}
@Override @Override
public ElementMatcher<TypeDescription> typeMatcher() { public ElementMatcher<TypeDescription> typeMatcher() {

View File

@ -1,9 +1,13 @@
package datadog.trace.instrumentation.springwebflux; package datadog.trace.instrumentation.springwebflux;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
@ -15,18 +19,13 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public final class HandlerFunctionAdapterInstrumentation extends Instrumenter.Default { public final class HandlerAdapterInstrumentation extends AbstractWebfluxInstrumentation {
public static final String PACKAGE =
HandlerFunctionAdapterInstrumentation.class.getPackage().getName();
public HandlerFunctionAdapterInstrumentation() {
super("spring-webflux", "spring-webflux-functional");
}
@Override @Override
public ElementMatcher<TypeDescription> typeMatcher() { public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter"); return not(isInterface())
.and(not(isAbstract()))
.and(safeHasSuperType(named("org.springframework.web.reactive.HandlerAdapter")));
} }
@Override @Override
@ -36,8 +35,9 @@ public final class HandlerFunctionAdapterInstrumentation extends Instrumenter.De
.and(isPublic()) .and(isPublic())
.and(named("handle")) .and(named("handle"))
.and(takesArgument(0, named("org.springframework.web.server.ServerWebExchange"))) .and(takesArgument(0, named("org.springframework.web.server.ServerWebExchange")))
.and(takesArgument(1, named("java.lang.Object")))
.and(takesArguments(2)), .and(takesArguments(2)),
// Cannot reference class directly here because it would lead to class load failure on Java7 // Cannot reference class directly here because it would lead to class load failure on Java7
PACKAGE + ".HandlerFunctionAdapterAdvice"); PACKAGE + ".HandlerAdapterAdvice");
} }
} }

View File

@ -1,53 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
// Provides a way to get the URL with path variables
@AutoService(Instrumenter.class)
public final class RequestMappingInfoHandlerMappingInstrumentation extends Instrumenter.Default {
public static final String PACKAGE =
RequestMappingInfoHandlerMappingInstrumentation.class.getPackage().getName();
public RequestMappingInfoHandlerMappingInstrumentation() {
super("spring-webflux", "spring-webflux-annotation");
}
@Override
public String[] helperClassNames() {
return new String[] {PACKAGE + ".DispatcherHandlerMonoBiConsumer"};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.web.reactive.result.method.RequestMappingInfoHandlerMapping");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isProtected())
.and(named("handleMatch"))
.and(
takesArgument(
0, named("org.springframework.web.reactive.result.method.RequestMappingInfo")))
.and(takesArgument(1, named("org.springframework.web.method.HandlerMethod")))
.and(takesArgument(2, named("org.springframework.web.server.ServerWebExchange")))
.and(takesArguments(3)),
// Cannot reference class directly here because it would lead to class load failure on Java7
PACKAGE + ".RequestMappingInfoHandlerMappingAdvice");
}
}

View File

@ -2,7 +2,6 @@ package datadog.trace.instrumentation.springwebflux;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.declaresField;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract; import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@ -19,27 +18,20 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public final class RouterFunctionInstrumentation extends Instrumenter.Default { public final class RouterFunctionInstrumentation extends AbstractWebfluxInstrumentation {
public static final String PACKAGE = RouterFunctionInstrumentation.class.getPackage().getName();
public RouterFunctionInstrumentation() { public RouterFunctionInstrumentation() {
super("spring-webflux", "spring-webflux-functional"); super("spring-webflux-functional");
}
@Override
public String[] helperClassNames() {
return new String[] {PACKAGE + ".DispatcherHandlerMonoBiConsumer"};
} }
@Override @Override
public ElementMatcher<TypeDescription> typeMatcher() { public ElementMatcher<TypeDescription> typeMatcher() {
return not(isAbstract()) return not(isAbstract())
.and(declaresField(named("predicate")))
.and( .and(
safeHasSuperType( safeHasSuperType(
// TODO: this doesn't handle nested routes (DefaultNestedRouterFunction)
named( named(
"org.springframework.web.reactive.function.server.RouterFunctions$AbstractRouterFunction"))); "org.springframework.web.reactive.function.server.RouterFunctions$DefaultRouterFunction")));
} }
@Override @Override

View File

@ -0,0 +1,65 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
public class AdviceUtils {
public static final String SPAN_ATTRIBUTE = "datadog.trace.instrumentation.springwebflux.Span";
public static final String PARENT_SPAN_ATTRIBUTE =
"datadog.trace.instrumentation.springwebflux.ParentSpan";
public static String parseOperationName(final Object handler) {
final String className = parseClassName(handler.getClass());
final String operationName;
final int lambdaIdx = className.indexOf("$$Lambda$");
if (lambdaIdx > -1) {
operationName = className.substring(0, lambdaIdx) + ".lambda";
} else {
operationName = className + ".handle";
}
return operationName;
}
public static String parseClassName(final Class clazz) {
String className = clazz.getSimpleName();
if (className.isEmpty()) {
className = clazz.getName();
if (clazz.getPackage() != null) {
final String pkgName = clazz.getPackage().getName();
if (!pkgName.isEmpty()) {
className = clazz.getName().replace(pkgName, "").substring(1);
}
}
}
return className;
}
public static void finishSpanIfPresent(
final ServerWebExchange exchange, final Throwable throwable) {
// Span could have been removed and finished by other thread before we got here
finishSpanIfPresent((Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable);
}
public static void finishSpanIfPresent(
final ServerRequest serverRequest, final Throwable throwable) {
// Span could have been removed and finished by other thread before we got here
finishSpanIfPresent((Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
private static void finishSpanIfPresent(final Span span, final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
span.finish();
}
}
}

View File

@ -1,37 +1,58 @@
package datadog.trace.instrumentation.springwebflux; package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT; import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* This is 'top level' advice for Webflux instrumentation. This handles creating and finishing
* Webflux span.
*/
public class DispatcherHandlerAdvice { public class DispatcherHandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan() { public static Scope methodEnter(@Advice.Argument(0) final ServerWebExchange exchange) {
return GlobalTracer.get() // Unfortunately Netty EventLoop is not instrumented well enough to attribute all work to the
// right things so we have to store span in request itself. We also store parent (netty's) span
// so we could update resource name.
final Span parentSpan = GlobalTracer.get().activeSpan();
if (parentSpan != null) {
exchange.getAttributes().put(AdviceUtils.PARENT_SPAN_ATTRIBUTE, parentSpan);
}
final Scope scope =
GlobalTracer.get()
.buildSpan("DispatcherHandler.handle") .buildSpan("DispatcherHandler.handle")
.withTag(Tags.COMPONENT.getKey(), "spring-webflux-controller") .withTag(Tags.COMPONENT.getKey(), "spring-webflux-controller")
.startActive(true); .withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_SERVER)
.startActive(false);
((TraceScope) scope).setAsyncPropagation(true);
exchange.getAttributes().put(AdviceUtils.SPAN_ATTRIBUTE, scope.span());
return scope;
} }
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addBehaviorTrigger( public static void methodExit(
@Advice.Enter final Scope scope, @Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable, @Advice.Thrown final Throwable throwable,
@Advice.Return(readOnly = false) Mono<Void> returnMono) { @Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Return(readOnly = false) Mono<?> returnMono) {
if (scope != null) { if (throwable == null && returnMono != null) {
if (throwable != null) { returnMono =
Tags.ERROR.set(scope.span(), true); returnMono
scope.span().log(Collections.singletonMap(ERROR_OBJECT, throwable)); .doOnSuccessOrError(new DispatcherHandlerOnSuccessOrError<>(exchange))
} else { .doOnCancel(new DispatcherHandlerOnCancel(exchange));
returnMono = returnMono.doOnSuccessOrError(new DispatcherHandlerMonoBiConsumer<>(scope)); } else if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
} }
if (scope != null) {
scope.close();
} }
} }
} }

View File

@ -1,46 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.function.BiConsumer;
public class DispatcherHandlerMonoBiConsumer<U> implements BiConsumer<U, Throwable> {
private final Scope scope;
public static final ThreadLocal<String> tlsPathUrlTag = new ThreadLocal<>();
public DispatcherHandlerMonoBiConsumer(final Scope scope) {
this.scope = scope;
}
@Override
public void accept(final U object, final Throwable throwable) {
final Span spanToChange = scope.span();
if (throwable != null) {
spanToChange.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.ERROR.set(spanToChange, true);
}
scope.close();
final Span parentSpan = GlobalTracer.get().activeSpan();
final String pathUrl = tlsPathUrlTag.get();
if (pathUrl != null && parentSpan != null) {
parentSpan.setTag(DDTags.RESOURCE_NAME, pathUrl);
parentSpan.setTag(DDTags.SPAN_TYPE, DDSpanTypes.WEB_SERVLET);
tlsPathUrlTag.remove();
}
}
public static void setTLPathUrl(final String pathUrl) {
tlsPathUrlTag.set(pathUrl);
}
}

View File

@ -0,0 +1,18 @@
package datadog.trace.instrumentation.springwebflux;
import org.springframework.web.server.ServerWebExchange;
public class DispatcherHandlerOnCancel implements Runnable {
private final ServerWebExchange exchange;
public DispatcherHandlerOnCancel(final ServerWebExchange exchange) {
this.exchange = exchange;
}
@Override
public void run() {
// Make sure we are not leaking opened spans for canceled Monos.
AdviceUtils.finishSpanIfPresent(exchange, null);
}
}

View File

@ -0,0 +1,21 @@
package datadog.trace.instrumentation.springwebflux;
import java.util.function.BiConsumer;
import org.springframework.web.server.ServerWebExchange;
public class DispatcherHandlerOnSuccessOrError<U> implements BiConsumer<U, Throwable> {
private final ServerWebExchange exchange;
public DispatcherHandlerOnSuccessOrError(final ServerWebExchange exchange) {
this.exchange = exchange;
}
@Override
public void accept(final U object, final Throwable throwable) {
// Closing span here means it closes after Netty span which may not be ideal.
// We could instrument HandlerFunctionAdapter instead, but this would mean we
// would not account for time spent sending request.
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}
}

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.springwebflux;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import net.bytebuddy.asm.Advice;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.pattern.PathPattern;
public class HandlerAdapterAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Argument(1) final Object handler) {
Scope scope = null;
final Span span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE);
if (handler != null && span != null) {
final String handlerType;
final String operationName;
if (handler instanceof HandlerMethod) {
// Special case for requests mapped with annotations
final HandlerMethod handlerMethod = (HandlerMethod) handler;
final Class handlerClass = handlerMethod.getMethod().getDeclaringClass();
operationName =
AdviceUtils.parseClassName(handlerClass) + "." + handlerMethod.getMethod().getName();
handlerType = handlerClass.getName();
} else {
operationName = AdviceUtils.parseOperationName(handler);
handlerType = handler.getClass().getName();
}
span.setOperationName(operationName);
span.setTag("handler.type", handlerType);
scope = GlobalTracer.get().scopeManager().activate(span, false);
((TraceScope) scope).setAsyncPropagation(true);
}
final Span parentSpan = exchange.getAttribute(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
final PathPattern bestPattern =
exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (parentSpan != null && bestPattern != null) {
parentSpan.setTag(
DDTags.RESOURCE_NAME,
exchange.getRequest().getMethodValue() + " " + bestPattern.getPatternString());
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable) {
if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}
if (scope != null) {
scope.close();
}
}
}

View File

@ -1,52 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import net.bytebuddy.asm.Advice;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.server.HandlerFunction;
public class HandlerFunctionAdapterAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void recordHandlerFunctionTag(
@Advice.Thrown final Throwable throwable, @Advice.Argument(1) final Object handler) {
final Span activeSpan = GlobalTracer.get().activeSpan();
if (activeSpan != null && handler != null) {
final Class clazz = handler.getClass();
String className = clazz.getSimpleName();
if (className.isEmpty()) {
className = clazz.getName();
if (clazz.getPackage() != null) {
final String pkgName = clazz.getPackage().getName();
if (!pkgName.isEmpty()) {
className = clazz.getName().replace(pkgName, "").substring(1);
}
}
}
LoggerFactory.getLogger(HandlerFunction.class).warn(className);
final String operationName;
final int lambdaIdx = className.indexOf("$$Lambda$");
if (lambdaIdx > -1) {
operationName = className.substring(0, lambdaIdx) + ".lambda";
} else {
operationName = className + ".handle";
}
activeSpan.setOperationName(operationName);
activeSpan.setTag("handler.type", clazz.getName());
if (throwable != null) {
Tags.ERROR.set(activeSpan, true);
activeSpan.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
}
}
}

View File

@ -1,64 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Collections;
import net.bytebuddy.asm.Advice;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.pattern.PathPattern;
public class RequestMappingInfoHandlerMappingAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void nameSpan(
@Advice.Argument(1) final HandlerMethod handlerMethod,
@Advice.Argument(2) final ServerWebExchange serverWebExchange,
@Advice.Thrown final Throwable throwable) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope != null) {
final Span span = scope.span();
final PathPattern bestPattern =
serverWebExchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (bestPattern != null) {
DispatcherHandlerMonoBiConsumer.setTLPathUrl(
serverWebExchange.getRequest().getMethodValue() + " " + bestPattern.getPatternString());
}
if (handlerMethod != null) {
final Method method;
final Class clazz;
final String methodName;
clazz = handlerMethod.getMethod().getDeclaringClass();
method = handlerMethod.getMethod();
methodName = method.getName();
span.setTag("handler.type", clazz.getName());
String className = clazz.getSimpleName();
if (className.isEmpty()) {
className = clazz.getName();
if (clazz.getPackage() != null) {
final String pkgName = clazz.getPackage().getName();
if (!pkgName.isEmpty()) {
className = clazz.getName().replace(pkgName, "").substring(1);
}
}
}
span.setOperationName(className + "." + methodName);
}
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
}
}
}

View File

@ -0,0 +1,61 @@
package datadog.trace.instrumentation.springwebflux;
import datadog.trace.api.DDTags;
import io.opentracing.Span;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
public class RouteOnSuccessOrError implements BiConsumer<HandlerFunction<?>, Throwable> {
private static final Pattern SPECIAL_CHARACTERS_REGEX = Pattern.compile("[\\(\\)&|]");
private static final Pattern SPACES_REGEX = Pattern.compile("[ \\t]+");
private final RouterFunction routerFunction;
private final ServerRequest serverRequest;
public RouteOnSuccessOrError(
final RouterFunction routerFunction, final ServerRequest serverRequest) {
this.routerFunction = routerFunction;
this.serverRequest = serverRequest;
}
@Override
public void accept(final HandlerFunction<?> handler, final Throwable throwable) {
if (handler != null) {
final String predicateString = parsePredicateString();
if (predicateString != null) {
final Span span = (Span) serverRequest.attributes().get(AdviceUtils.SPAN_ATTRIBUTE);
if (span != null) {
span.setTag("request.predicate", predicateString);
}
final Span parentSpan =
(Span) serverRequest.attributes().get(AdviceUtils.PARENT_SPAN_ATTRIBUTE);
if (parentSpan != null) {
parentSpan.setTag(DDTags.RESOURCE_NAME, parseResourceName(predicateString));
}
}
}
}
private String parsePredicateString() {
final String routerFunctionString = routerFunction.toString();
// Router functions containing lambda predicates should not end up in span tags since they are
// confusing
if (routerFunctionString.startsWith(
"org.springframework.web.reactive.function.server.RequestPredicates$$Lambda$")) {
return null;
} else {
return routerFunctionString.replaceFirst("\\s*->.*$", "");
}
}
private String parseResourceName(final String routerString) {
return SPACES_REGEX
.matcher(SPECIAL_CHARACTERS_REGEX.matcher(routerString).replaceAll(""))
.replaceAll(" ")
.trim();
}
}

View File

@ -1,65 +1,27 @@
package datadog.trace.instrumentation.springwebflux; package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import org.springframework.web.reactive.function.server.RequestPredicate; import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;
/**
* This advice is responsible for setting additional span parameters for routes implemented with
* functional interface.
*/
public class RouterFunctionAdvice { public class RouterFunctionAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void nameResource(
@Advice.FieldValue(value = "predicate") final RequestPredicate predicate,
@Advice.Argument(0) final ServerRequest serverRequest) {
if (predicate == null) {
return;
}
final Class predicateEnclosingClass = predicate.getClass().getEnclosingClass();
final String predicateString = predicate.toString();
if (predicate.test(serverRequest)
&& serverRequest != null
&& predicateString != null
&& !predicateString.isEmpty()
&& predicateEnclosingClass == RequestPredicates.class) {
// only change parent span if the predicate is one of those enclosed in
// org.springframework.web.reactive.function.server RequestPredicates
// otherwise the parent may have weird resource names such as lambda request predicate class
// names that arise from webflux error handling
final String resourceName =
predicateString.replaceAll("[\\(\\)&|]", "").replaceAll("[ \\t]+", " ");
// to be used as resource name by netty span, most likely
DispatcherHandlerMonoBiConsumer.setTLPathUrl(resourceName);
// should be the dispatcher handler span
final Scope activeScope = GlobalTracer.get().scopeManager().active();
if (activeScope != null) {
activeScope.span().setTag("request.predicate", predicateString);
activeScope.span().setTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_SERVER);
}
}
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void recordThrowable(@Advice.Thrown final Throwable throwable) { public static void methodExit(
final Scope scope = GlobalTracer.get().scopeManager().active(); @Advice.This final RouterFunction thiz,
if (scope != null && throwable != null) { @Advice.Argument(0) final ServerRequest serverRequest,
final Span span = scope.span(); @Advice.Return(readOnly = false) Mono<HandlerFunction<?>> result,
Tags.ERROR.set(span, true); @Advice.Thrown final Throwable throwable) {
span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); if (throwable == null) {
result = result.doOnSuccessOrError(new RouteOnSuccessOrError(thiz, serverRequest));
} else {
AdviceUtils.finishSpanIfPresent(serverRequest, throwable);
} }
} }
} }

View File

@ -0,0 +1,28 @@
import dd.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer
import org.springframework.context.annotation.Bean
import reactor.ipc.netty.resources.LoopResources
/**
* Run all Webflux tests under netty event loop having only 1 thread.
* Some of the bugs are better visible in this setup because same thread is reused
* for different requests.
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [SpringWebFluxTestApplication, ForceSingleThreadedNettyAutoConfiguration])
class SingleThreadedSpringWebfluxTest extends SpringWebfluxTest {
@TestConfiguration
static class ForceSingleThreadedNettyAutoConfiguration {
@Bean
NettyReactiveWebServerFactory nettyFactory() {
def factory = new NettyReactiveWebServerFactory()
NettyServerCustomizer customizer = { builder -> builder.loopResources(LoopResources.create("my-http", 1, true)) }
factory.addServerCustomizers(customizer)
return factory
}
}
}

View File

@ -2,21 +2,33 @@ import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.OkHttpUtils import datadog.trace.agent.test.utils.OkHttpUtils
import datadog.trace.api.DDSpanTypes import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags import datadog.trace.api.DDTags
import datadog.trace.instrumentation.springwebflux.EchoHandlerFunction import dd.trace.instrumentation.springwebflux.EchoHandlerFunction
import datadog.trace.instrumentation.springwebflux.FooModel import dd.trace.instrumentation.springwebflux.FooModel
import datadog.trace.instrumentation.springwebflux.SpringWebFluxTestApplication import dd.trace.instrumentation.springwebflux.SpringWebFluxTestApplication
import datadog.trace.instrumentation.springwebflux.TestController import dd.trace.instrumentation.springwebflux.TestController
import io.opentracing.tag.Tags import io.opentracing.tag.Tags
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Request import okhttp3.Request
import okhttp3.RequestBody import okhttp3.RequestBody
import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.boot.web.server.LocalServerPort import org.springframework.boot.web.server.LocalServerPort
import org.springframework.context.annotation.Bean
import org.springframework.web.server.ResponseStatusException import org.springframework.web.server.ResponseStatusException
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = SpringWebFluxTestApplication)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [SpringWebFluxTestApplication, ForceNettyAutoConfiguration])
class SpringWebfluxTest extends AgentTestRunner { class SpringWebfluxTest extends AgentTestRunner {
@TestConfiguration
static class ForceNettyAutoConfiguration {
@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}
static final okhttp3.MediaType PLAIN_TYPE = okhttp3.MediaType.parse("text/plain; charset=utf-8") static final okhttp3.MediaType PLAIN_TYPE = okhttp3.MediaType.parse("text/plain; charset=utf-8")
static final String INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX = SpringWebFluxTestApplication.getName() + "\$" static final String INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX = SpringWebFluxTestApplication.getName() + "\$"
static final String SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX = SpringWebFluxTestApplication.getSimpleName() + "\$" static final String SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX = SpringWebFluxTestApplication.getSimpleName() + "\$"
@ -26,9 +38,9 @@ class SpringWebfluxTest extends AgentTestRunner {
OkHttpClient client = OkHttpUtils.client() OkHttpClient client = OkHttpUtils.client()
def "Basic GET test #testName to functional API"() { def "Basic GET test #testName"() {
setup: setup:
String url = "http://localhost:$port/greet$urlSuffix" String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build() def request = new Request.Builder().url(url).get().build()
when: when:
def response = client.newCall(request).execute() def response = client.newCall(request).execute()
@ -39,73 +51,35 @@ class SpringWebfluxTest extends AgentTestRunner {
assertTraces(1) { assertTraces(1) {
trace(0, 2) { trace(0, 2) {
span(0) { span(0) {
if (annotatedMethod == null) {
// Functional API
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
childOf(span(1)) childOf(span(1))
tags { tags {
"$Tags.COMPONENT.key" "spring-webflux-controller" "$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"request.predicate" "(GET && /greet$pathVariableUrlSuffix)" if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal -> "handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX) return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
} }
defaultTags() } else {
} // Annotation API
}
span(1) {
resourceName "GET /greet$pathVariableUrlSuffix"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
}
}
}
}
where:
testName | urlSuffix | pathVariableUrlSuffix | expectedResponseBody
"without paramaters" | "" | "" | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"with one parameter" | "/WORLD" | "/{name}" | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " WORLD"
"with two parameters" | "/World/Test1" | "/{name}/{word}" | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " World Test1"
}
def "Basic GET test #testName to annotations API"() {
setup:
String url = "http://localhost:$port/foo$urlSuffix"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
then:
response.code == 200
response.body().string() == expectedResponseBody
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName TestController.getSimpleName() + ".getFooModel"
operationName TestController.getSimpleName() + ".getFooModel"
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" TestController.getName() "handler.type" TestController.getName()
}
defaultTags() defaultTags()
} }
} }
span(1) { span(1) {
resourceName "GET /foo$pathVariableUrlSuffix" resourceName "GET $urlPathWithVariables"
operationName "netty.request" operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
parent() parent()
@ -125,10 +99,16 @@ class SpringWebfluxTest extends AgentTestRunner {
} }
where: where:
testName | urlSuffix | pathVariableUrlSuffix | expectedResponseBody testName | urlPath | urlPathWithVariables | annotatedMethod | expectedResponseBody
"without parameters" | "" | "" | new FooModel(0L, "DEFAULT").toString() "functional API without parameters" | "/greet" | "/greet" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"with one parameter" | "/1" | "/{id}" | new FooModel(1, "pass").toString() "functional API with one parameter" | "/greet/WORLD" | "/greet/{name}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " WORLD"
"with two parameters" | "/2/world" | "/{id}/{name}" | new FooModel(2, "world").toString() "functional API with two parameters" | "/greet/World/Test1" | "/greet/{name}/{word}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " World Test1"
"functional API delayed response" | "/greet-delayed" | "/greet-delayed" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"annotation API without parameters" | "/foo" | "/foo" | "getFooModel" | new FooModel(0L, "DEFAULT").toString()
"annotation API with one parameter" | "/foo/1" | "/foo/{id}" | "getFooModel" | new FooModel(1L, "pass").toString()
"annotation API with two parameters" | "/foo/2/world" | "/foo/{id}/{name}" | "getFooModel" | new FooModel(2L, "world").toString()
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
} }
def "404 GET test"() { def "404 GET test"() {
@ -161,14 +141,15 @@ class SpringWebfluxTest extends AgentTestRunner {
} }
} }
span(1) { span(1) {
resourceName "DispatcherHandler.handle" resourceName "ResourceWebHandler.handle"
operationName "DispatcherHandler.handle" operationName "ResourceWebHandler.handle"
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
childOf(span(0)) childOf(span(0))
errored true errored true
tags { tags {
"$Tags.COMPONENT.key" "spring-webflux-controller" "$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" "org.springframework.web.reactive.resource.ResourceWebHandler"
errorTags(ResponseStatusException, String) errorTags(ResponseStatusException, String)
defaultTags() defaultTags()
} }
@ -191,7 +172,7 @@ class SpringWebfluxTest extends AgentTestRunner {
response.code() == 202 response.code() == 202
response.body().string() == echoString response.body().string() == echoString
assertTraces(1) { assertTraces(1) {
trace(0, 2) { trace(0, 3) {
span(0) { span(0) {
resourceName EchoHandlerFunction.getSimpleName() + ".handle" resourceName EchoHandlerFunction.getSimpleName() + ".handle"
operationName EchoHandlerFunction.getSimpleName() + ".handle" operationName EchoHandlerFunction.getSimpleName() + ".handle"
@ -224,13 +205,23 @@ class SpringWebfluxTest extends AgentTestRunner {
defaultTags() defaultTags()
} }
} }
span(2) {
resourceName "echo"
operationName "echo"
childOf(span(0))
tags {
"$Tags.COMPONENT.key" "trace"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
defaultTags()
}
}
} }
} }
} }
def "GET to bad annotation API endpoint"() { def "GET to bad endpoint #testName"() {
setup: setup:
String url = "http://localhost:$port/failfoo/1" String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build() def request = new Request.Builder().url(url).get().build()
when: when:
@ -241,7 +232,7 @@ class SpringWebfluxTest extends AgentTestRunner {
assertTraces(1) { assertTraces(1) {
trace(0, 2) { trace(0, 2) {
span(0) { span(0) {
resourceName "GET /failfoo/{id}" resourceName "GET $urlPathWithVariables"
operationName "netty.request" operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
errored true errored true
@ -260,80 +251,51 @@ class SpringWebfluxTest extends AgentTestRunner {
} }
} }
span(1) { span(1) {
resourceName "TestController.getFooModelFail" if (annotatedMethod == null) {
operationName "TestController.getFooModelFail" // Functional API
spanType DDSpanTypes.HTTP_SERVER
childOf(span(0))
errored true
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" TestController.getName()
errorTags(ArithmeticException, String)
defaultTags()
}
}
}
}
}
def "POST to bad functional API endpoint"() {
setup:
String echoString = "TEST"
RequestBody body = RequestBody.create(PLAIN_TYPE, echoString)
String url = "http://localhost:$port/fail-echo"
def request = new Request.Builder().url(url).post(body).build()
when:
def response = client.newCall(request).execute()
then:
response.code() == 500
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName "POST /fail-echo"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
errored true
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "POST"
"$Tags.HTTP_STATUS.key" 500
"$Tags.HTTP_URL.key" url
"error" true
defaultTags()
}
}
span(1) {
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
childOf(span(0)) childOf(span(0))
errored true errored true
tags { tags {
"$Tags.COMPONENT.key" "spring-webflux-controller" "$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"request.predicate" "(POST && /fail-echo)" if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal -> "handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX) return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
} }
errorTags(NullPointerException, String) } else {
// Annotation API
"handler.type" TestController.getName()
}
errorTags(RuntimeException, "bad things happen")
defaultTags() defaultTags()
} }
} }
} }
} }
where:
testName | urlPath | urlPathWithVariables | annotatedMethod
"functional API fail fast" | "/greet-failfast/1" | "/greet-failfast/{id}" | null
"functional API fail Mono" | "/greet-failmono/1" | "/greet-failmono/{id}" | null
"annotation API fail fast" | "/foo-failfast/1" | "/foo-failfast/{id}" | "getFooFailFast"
"annotation API fail Mono" | "/foo-failmono/1" | "/foo-failmono/{id}" | "getFooFailMono"
} }
def "Redirect test"() { def "Redirect test"() {
setup: setup:
String url = "http://localhost:$port/double-greet-redirect" String url = "http://localhost:$port/double-greet-redirect"
String finalUrl = "http://localhost:$port/double-greet"
def request = new Request.Builder().url(url).get().build() def request = new Request.Builder().url(url).get().build()
when: when:
@ -342,6 +304,7 @@ class SpringWebfluxTest extends AgentTestRunner {
then: then:
response.code == 200 response.code == 200
assertTraces(2) { assertTraces(2) {
// TODO: why order of spans is different in these traces?
trace(0, 2) { trace(0, 2) {
span(0) { span(0) {
resourceName "GET /double-greet-redirect" resourceName "GET /double-greet-redirect"
@ -371,7 +334,7 @@ class SpringWebfluxTest extends AgentTestRunner {
"request.predicate" "(GET && /double-greet-redirect)" "request.predicate" "(GET && /double-greet-redirect)"
"handler.type" { String tagVal -> "handler.type" { String tagVal ->
return (tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX) return (tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
|| tagVal.contains("Lambda")) // || tagVal.contains("Proxy")) || tagVal.contains("Lambda"))
} }
defaultTags() defaultTags()
} }
@ -406,7 +369,7 @@ class SpringWebfluxTest extends AgentTestRunner {
"$Tags.PEER_PORT.key" Integer "$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET" "$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200 "$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" "http://localhost:$port/double-greet" "$Tags.HTTP_URL.key" finalUrl
defaultTags() defaultTags()
} }
} }
@ -414,87 +377,50 @@ class SpringWebfluxTest extends AgentTestRunner {
} }
} }
def "Flux x#count GET test with functional API endpoint"() { def "Multiple GETs to delaying route #testName"() {
setup: setup:
String expectedResponseBodyStr = FooModel.createXFooModelsStringFromArray(FooModel.createXFooModels(count)) def requestsCount = 50 // Should be more than 2x CPUs to fish out some bugs
String url = "http://localhost:$port/greet-counter/$count" String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build() def request = new Request.Builder().url(url).get().build()
when: when:
def response = client.newCall(request).execute() def responses = (0..requestsCount - 1).collect { client.newCall(request).execute() }
then: then:
response.code() == 200 responses.every { it.code == 200 }
expectedResponseBodyStr == response.body().string() responses.every { it.body().string() == expectedResponseBody }
assertTraces(1) { assertTraces(responses.size()) {
trace(0, 2) { responses.eachWithIndex { def response, int i ->
trace(i, 2) {
span(0) { span(0) {
if (annotatedMethod == null) {
// Functional API
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
childOf(span(1)) childOf(span(1))
tags { tags {
"$Tags.COMPONENT.key" "spring-webflux-controller" "$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"request.predicate" "(GET && /greet-counter/{count})" if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal -> "handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX) return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
} }
defaultTags() } else {
} // Annotation API
}
span(1) {
resourceName "GET /greet-counter/{count}"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
}
}
}
}
where:
count << [0, 1, 10]
}
def "Flux x#count GET test with spring annotations endpoint"() {
setup:
String expectedResponseBodyStr = FooModel.createXFooModelsStringFromArray(FooModel.createXFooModels(count))
String url = "http://localhost:$port/annotation-foos/$count"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
then:
response.code() == 200
expectedResponseBodyStr == response.body().string()
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName TestController.getSimpleName() + ".getXFooModels"
operationName TestController.getSimpleName() + ".getXFooModels"
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"handler.type" TestController.getName() "handler.type" TestController.getName()
}
defaultTags() defaultTags()
} }
} }
span(1) { span(1) {
resourceName "GET /annotation-foos/{count}" resourceName "GET $urlPathWithVariables"
operationName "netty.request" operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER spanType DDSpanTypes.HTTP_SERVER
parent() parent()
@ -512,8 +438,11 @@ class SpringWebfluxTest extends AgentTestRunner {
} }
} }
} }
}
where: where:
count << [0, 1, 10] testName | urlPath | urlPathWithVariables | annotatedMethod | expectedResponseBody
"functional API delayed response" | "/greet-delayed" | "/greet-delayed" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
} }
} }

View File

@ -1,37 +0,0 @@
package datadog.trace.instrumentation.springwebflux
class FooModel {
public long id
public String name
FooModel(long id, String name) {
this.id = id
this.name = name
}
@Override
String toString() {
return "{\"id\":" + id + ",\"name\":\"" + name + "\"}"
}
static FooModel[] createXFooModels(long count) {
FooModel[] foos = new FooModel[count]
for (int i = 0; i < count; ++i) {
foos[i] = new FooModel(i, String.valueOf(i))
}
return foos
}
static String createXFooModelsStringFromArray(FooModel[] foos) {
StringBuilder sb = new StringBuilder()
sb.append("[")
for (int i = 0; i < foos.length; ++i) {
sb.append(foos[i].toString())
if (i < foos.length - 1) {
sb.append(",")
}
}
sb.append("]")
return sb.toString()
}
}

View File

@ -1,13 +1,19 @@
package datadog.trace.instrumentation.springwebflux package dd.trace.instrumentation.springwebflux
import datadog.trace.api.Trace
import org.springframework.http.MediaType import org.springframework.http.MediaType
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono import reactor.core.publisher.Mono
/**
* Note: this class has to stay outside of 'datadog.*' package because we need
* it transformed by {@code @Trace} annotation.
*/
@Component @Component
class EchoHandler { class EchoHandler {
@Trace(operationName = "echo")
Mono<ServerResponse> echo(ServerRequest request) { Mono<ServerResponse> echo(ServerRequest request) {
return ServerResponse.accepted().contentType(MediaType.TEXT_PLAIN) return ServerResponse.accepted().contentType(MediaType.TEXT_PLAIN)
.body(request.bodyToMono(String), String) .body(request.bodyToMono(String), String)

View File

@ -1,4 +1,5 @@
package datadog.trace.instrumentation.springwebflux package dd.trace.instrumentation.springwebflux
import org.springframework.web.reactive.function.server.HandlerFunction import org.springframework.web.reactive.function.server.HandlerFunction
import org.springframework.web.reactive.function.server.ServerRequest import org.springframework.web.reactive.function.server.ServerRequest

View File

@ -0,0 +1,16 @@
package dd.trace.instrumentation.springwebflux
class FooModel {
public long id
public String name
FooModel(long id, String name) {
this.id = id
this.name = name
}
@Override
String toString() {
return "{\"id\":" + id + ",\"name\":\"" + name + "\"}"
}
}

View File

@ -1,12 +1,6 @@
package datadog.trace.instrumentation.springwebflux package dd.trace.instrumentation.springwebflux
import org.springframework.boot.SpringApplication
import org.springframework.boot.WebApplicationType
import org.springframework.boot.autoconfigure.AutoConfigureBefore
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.web.reactive.ReactiveWebServerFactoryAutoConfiguration
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.http.MediaType import org.springframework.http.MediaType
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
@ -18,6 +12,8 @@ import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Flux import reactor.core.publisher.Flux
import reactor.core.publisher.Mono import reactor.core.publisher.Mono
import java.time.Duration
import static org.springframework.web.reactive.function.server.RequestPredicates.GET import static org.springframework.web.reactive.function.server.RequestPredicates.GET
import static org.springframework.web.reactive.function.server.RequestPredicates.POST import static org.springframework.web.reactive.function.server.RequestPredicates.POST
import static org.springframework.web.reactive.function.server.RouterFunctions.route import static org.springframework.web.reactive.function.server.RouterFunctions.route
@ -25,31 +21,9 @@ import static org.springframework.web.reactive.function.server.RouterFunctions.r
@SpringBootApplication @SpringBootApplication
class SpringWebFluxTestApplication { class SpringWebFluxTestApplication {
static void main(String[] args) {
SpringApplication app = new SpringApplication(SpringWebFluxTestApplication)
app.setWebApplicationType(WebApplicationType.REACTIVE)
app.run(args)
}
@TestConfiguration
@AutoConfigureBefore(ReactiveWebServerFactoryAutoConfiguration)
class ForceNettyAutoConfiguration {
@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}
@Bean @Bean
RouterFunction<ServerResponse> echoRouterFunction(EchoHandler echoHandler) { RouterFunction<ServerResponse> echoRouterFunction(EchoHandler echoHandler) {
return route(POST("/echo"), new EchoHandlerFunction(echoHandler)) return route(POST("/echo"), new EchoHandlerFunction(echoHandler))
.andRoute(POST("/fail-echo"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return echoHandler.echo(null)
}
})
} }
@Bean @Bean
@ -74,10 +48,20 @@ class SpringWebFluxTestApplication {
Mono<ServerResponse> handle(ServerRequest request) { Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.doubleGreet() return greetingHandler.doubleGreet()
} }
}).andRoute(GET("/greet-counter/{count}"), new HandlerFunction<ServerResponse>() { }).andRoute(GET("/greet-delayed"), new HandlerFunction<ServerResponse>() {
@Override @Override
Mono<ServerResponse> handle(ServerRequest request) { Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.counterGreet(request) return greetingHandler.defaultGreet().delayElement(Duration.ofMillis(100))
}
}).andRoute(GET("/greet-failfast/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
throw new RuntimeException("bad things happen")
}
}).andRoute(GET("/greet-failmono/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return Mono.error(new RuntimeException("bad things happen"))
} }
}) })
} }

View File

@ -1,19 +1,15 @@
package datadog.trace.instrumentation.springwebflux package dd.trace.instrumentation.springwebflux
import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RestController import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono import reactor.core.publisher.Mono
import java.time.Duration
@RestController @RestController
class TestController { class TestController {
@GetMapping("/failfoo/{id}")
Mono<FooModel> getFooModelFail(@PathVariable("id") long id) {
return Mono.just(new FooModel((id / 0), "fail"))
}
@GetMapping("/foo") @GetMapping("/foo")
Mono<FooModel> getFooModel() { Mono<FooModel> getFooModel() {
return Mono.just(new FooModel(0L, "DEFAULT")) return Mono.just(new FooModel(0L, "DEFAULT"))
@ -29,9 +25,18 @@ class TestController {
return Mono.just(new FooModel(id, name)) return Mono.just(new FooModel(id, name))
} }
@GetMapping(value = "/annotation-foos/{count}") @GetMapping("/foo-delayed")
Flux<FooModel> getXFooModels(@PathVariable("count") long count) { Mono<FooModel> getFooDelayed() {
FooModel[] foos = FooModel.createXFooModels(count) return Mono.just(new FooModel(3L, "delayed")).delayElement(Duration.ofMillis(100))
return Flux.just(foos) }
@GetMapping("/foo-failfast/{id}")
Mono<FooModel> getFooFailFast(@PathVariable("id") long id) {
throw new RuntimeException("bad things happen")
}
@GetMapping("/foo-failmono/{id}")
Mono<FooModel> getFooFailMono(@PathVariable("id") long id) {
return Mono.error(new RuntimeException("bad things happen"))
} }
} }

View File

@ -1,4 +1,4 @@
package datadog.trace.instrumentation.springwebflux; package dd.trace.instrumentation.springwebflux;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route; import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.reactive.function.server.ServerResponse;
// Need to keep this in Java because groovy creates crazy proxies around lambdas
@Component @Component
public class RedirectComponent { public class RedirectComponent {
@Bean @Bean

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<root level="WARN">
<appender-ref ref="console"/>
</root>
<logger name="datadog" level="debug"/>
</configuration>