From 8e1382b4e4ecd0cf78470b63b5ea6d988c084285 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Tue, 12 Feb 2019 10:52:49 -0500 Subject: [PATCH] Improve webflux integration Add support for tracing Mono/Flux --- .../spring-webflux/spring-webflux.gradle | 18 +++- .../AbstractWebfluxInstrumentation.java | 5 +- .../springwebflux/AdviceUtils.java | 27 ++---- .../DispatcherHandlerAdvice.java | 14 +-- .../DispatcherHandlerOnCancel.java | 18 ---- .../DispatcherHandlerOnSuccessOrError.java | 21 ----- .../src/test/groovy/SpringWebfluxTest.groovy | 92 ++++++++++++++++++- .../SpringWebFluxTestApplication.groovy | 33 +++++-- .../springwebflux/TestController.groovy | 21 +++++ 9 files changed, 171 insertions(+), 78 deletions(-) delete mode 100644 dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnCancel.java delete mode 100644 dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnSuccessOrError.java diff --git a/dd-java-agent/instrumentation/spring-webflux/spring-webflux.gradle b/dd-java-agent/instrumentation/spring-webflux/spring-webflux.gradle index 4a930d212c..276ef1c9db 100644 --- a/dd-java-agent/instrumentation/spring-webflux/spring-webflux.gradle +++ b/dd-java-agent/instrumentation/spring-webflux/spring-webflux.gradle @@ -25,6 +25,12 @@ compileMain_java8Java { sourceCompatibility = 1.8 targetCompatibility = 1.8 } +// Note: ideally lombok plugin would do this for us, but currently it doesn't support custom +// source sets. See https://github.com/franzbecker/gradle-lombok/issues/17. +dependencies { + main_java8CompileOnly "org.projectlombok:lombok:${project.lombok.version}" transitive false + main_java8AnnotationProcessor "org.projectlombok:lombok:${project.lombok.version}" transitive false +} apply plugin: 'org.unbroken-dome.test-sets' @@ -44,7 +50,8 @@ compileLatestDepTestJava { } dependencies { - + // We use helpers from this project + main_java8CompileOnly project(':dd-java-agent:instrumentation:reactor-core-3.1') main_java8CompileOnly group: 'org.springframework', name: 'spring-webflux', version: '5.0.0.RELEASE' main_java8Compile project(':dd-java-agent:agent-tooling') @@ -53,12 +60,12 @@ dependencies { main_java8Compile deps.opentracing compileOnly sourceSets.main_java8.compileClasspath - compile sourceSets.main_java8.output - compileOnly group: 'org.springframework', name: 'spring-webflux', version: '5.0.0.RELEASE' - compile project(':dd-java-agent:agent-tooling') + // We are using utils class from reactor-core instrumentation. + // TODO: It is unclear why we need to use `compile` here (instead of 'compileOnly') + compile project(':dd-java-agent:instrumentation:reactor-core-3.1') compile deps.bytebuddy compile deps.opentracing @@ -66,9 +73,10 @@ dependencies { implementation deps.autoservice testCompile project(':dd-java-agent:testing') + testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:trace-annotation') testCompile project(':dd-java-agent:instrumentation:netty-4.1') - testCompile project(':dd-java-agent:instrumentation:java-concurrent') + testCompile project(':dd-java-agent:instrumentation:reactor-core-3.1') testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.0.RELEASE' testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: '2.0.0.RELEASE' diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/AbstractWebfluxInstrumentation.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/AbstractWebfluxInstrumentation.java index 80a06caa77..ae61ea6bea 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/AbstractWebfluxInstrumentation.java +++ b/dd-java-agent/instrumentation/spring-webflux/src/main/java/datadog/trace/instrumentation/springwebflux/AbstractWebfluxInstrumentation.java @@ -13,9 +13,10 @@ public abstract class AbstractWebfluxInstrumentation extends Instrumenter.Defaul @Override public String[] helperClassNames() { return new String[] { + // Some code comes from reactor's instrumentation's helper + "datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils", + "datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils$TracingSubscriber", PACKAGE + ".AdviceUtils", - PACKAGE + ".DispatcherHandlerOnSuccessOrError", - PACKAGE + ".DispatcherHandlerOnCancel", PACKAGE + ".RouteOnSuccessOrError" }; } diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/AdviceUtils.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/AdviceUtils.java index 956ae40989..ac34726898 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/AdviceUtils.java +++ b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/AdviceUtils.java @@ -1,18 +1,17 @@ package datadog.trace.instrumentation.springwebflux; -import static io.opentracing.log.Fields.ERROR_OBJECT; - +import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils; import io.opentracing.Span; -import io.opentracing.tag.Tags; -import java.util.Collections; +import lombok.extern.slf4j.Slf4j; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; +@Slf4j public class AdviceUtils { public static final String SPAN_ATTRIBUTE = "datadog.trace.instrumentation.springwebflux.Span"; public static final String PARENT_SPAN_ATTRIBUTE = - "datadog.trace.instrumentation.springwebflux.ParentSpan"; + "datadog.trace.instrumentation.springwebflux.ParentSpan"; public static String parseOperationName(final Object handler) { final String className = parseClassName(handler.getClass()); @@ -43,23 +42,13 @@ public class AdviceUtils { public static void finishSpanIfPresent( final ServerWebExchange exchange, final Throwable throwable) { - // Span could have been removed and finished by other thread before we got here - finishSpanIfPresent((Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable); + ReactorCoreAdviceUtils.finishSpanIfPresent( + (Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable); } public static void finishSpanIfPresent( final ServerRequest serverRequest, final Throwable throwable) { - // Span could have been removed and finished by other thread before we got here - finishSpanIfPresent((Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable); - } - - private static void finishSpanIfPresent(final Span span, final Throwable throwable) { - if (span != null) { - if (throwable != null) { - Tags.ERROR.set(span, true); - span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); - } - span.finish(); - } + ReactorCoreAdviceUtils.finishSpanIfPresent( + (Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable); } } diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerAdvice.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerAdvice.java index 06ec2bcdf9..0a28bc8ef9 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerAdvice.java +++ b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerAdvice.java @@ -3,11 +3,14 @@ package datadog.trace.instrumentation.springwebflux; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; import datadog.trace.context.TraceScope; +import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; +import java.util.function.Function; import net.bytebuddy.asm.Advice; +import org.reactivestreams.Publisher; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; @@ -42,12 +45,11 @@ public class DispatcherHandlerAdvice { @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable, @Advice.Argument(0) final ServerWebExchange exchange, - @Advice.Return(readOnly = false) Mono returnMono) { - if (throwable == null && returnMono != null) { - returnMono = - returnMono - .doOnSuccessOrError(new DispatcherHandlerOnSuccessOrError<>(exchange)) - .doOnCancel(new DispatcherHandlerOnCancel(exchange)); + @Advice.Return(readOnly = false) Mono mono) { + if (throwable == null && mono != null) { + final Function, ? extends Publisher> function = + ReactorCoreAdviceUtils.finishSpanNextOrError(); + mono = ReactorCoreAdviceUtils.setPublisherSpan(mono, scope.span()); } else if (throwable != null) { AdviceUtils.finishSpanIfPresent(exchange, throwable); } diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnCancel.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnCancel.java deleted file mode 100644 index ead1268ed9..0000000000 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnCancel.java +++ /dev/null @@ -1,18 +0,0 @@ -package datadog.trace.instrumentation.springwebflux; - -import org.springframework.web.server.ServerWebExchange; - -public class DispatcherHandlerOnCancel implements Runnable { - - private final ServerWebExchange exchange; - - public DispatcherHandlerOnCancel(final ServerWebExchange exchange) { - this.exchange = exchange; - } - - @Override - public void run() { - // Make sure we are not leaking opened spans for canceled Monos. - AdviceUtils.finishSpanIfPresent(exchange, null); - } -} diff --git a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnSuccessOrError.java b/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnSuccessOrError.java deleted file mode 100644 index 1d6de16089..0000000000 --- a/dd-java-agent/instrumentation/spring-webflux/src/main/java8/datadog/trace/instrumentation/springwebflux/DispatcherHandlerOnSuccessOrError.java +++ /dev/null @@ -1,21 +0,0 @@ -package datadog.trace.instrumentation.springwebflux; - -import java.util.function.BiConsumer; -import org.springframework.web.server.ServerWebExchange; - -public class DispatcherHandlerOnSuccessOrError implements BiConsumer { - - private final ServerWebExchange exchange; - - public DispatcherHandlerOnSuccessOrError(final ServerWebExchange exchange) { - this.exchange = exchange; - } - - @Override - public void accept(final U object, final Throwable throwable) { - // Closing span here means it closes after Netty span which may not be ideal. - // We could instrument HandlerFunctionAdapter instead, but this would mean we - // would not account for time spent sending request. - AdviceUtils.finishSpanIfPresent(exchange, throwable); - } -} diff --git a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/SpringWebfluxTest.groovy b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/SpringWebfluxTest.groovy index d2609ceba8..7f793956f1 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/SpringWebfluxTest.groovy +++ b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/SpringWebfluxTest.groovy @@ -17,7 +17,6 @@ import org.springframework.boot.web.server.LocalServerPort import org.springframework.context.annotation.Bean import org.springframework.web.server.ResponseStatusException - @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [SpringWebFluxTestApplication, ForceNettyAutoConfiguration]) class SpringWebfluxTest extends AgentTestRunner { @@ -111,6 +110,97 @@ class SpringWebfluxTest extends AgentTestRunner { "annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString() } + def "GET test with async response #testName"() { + setup: + String url = "http://localhost:$port$urlPath" + def request = new Request.Builder().url(url).get().build() + when: + def response = client.newCall(request).execute() + + then: + response.code == 200 + response.body().string() == expectedResponseBody + assertTraces(1) { + println TEST_WRITER + trace(0, 3) { + span(0) { + if (annotatedMethod == null) { + // Functional API + resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") + operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle") + } else { + // Annotation API + resourceName TestController.getSimpleName() + "." + annotatedMethod + operationName TestController.getSimpleName() + "." + annotatedMethod + } + spanType DDSpanTypes.HTTP_SERVER + childOf(span(1)) + tags { + "$Tags.COMPONENT.key" "spring-webflux-controller" + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER + if (annotatedMethod == null) { + // Functional API + "request.predicate" "(GET && $urlPathWithVariables)" + "handler.type" { String tagVal -> + return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX) + } + } else { + // Annotation API + "handler.type" TestController.getName() + } + defaultTags() + } + } + span(1) { + resourceName "GET $urlPathWithVariables" + operationName "netty.request" + spanType DDSpanTypes.HTTP_SERVER + parent() + tags { + "$Tags.COMPONENT.key" "netty" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER + "$Tags.PEER_HOSTNAME.key" "localhost" + "$Tags.PEER_PORT.key" Integer + "$Tags.HTTP_METHOD.key" "GET" + "$Tags.HTTP_STATUS.key" 200 + "$Tags.HTTP_URL.key" url + defaultTags() + } + } + span(2) { + serviceName "unnamed-java-app" + if (annotatedMethod == null) { + // Functional API + resourceName "SpringWebFluxTestApplication.tracedMethod" + operationName "SpringWebFluxTestApplication.tracedMethod" + } else { + // Annotation API + resourceName "TestController.tracedMethod" + operationName "TestController.tracedMethod" + } + childOf(span(0)) + errored false + tags { + "$Tags.COMPONENT.key" "trace" + "$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER + defaultTags() + } + } + } + } + + where: + testName | urlPath | urlPathWithVariables | annotatedMethod | expectedResponseBody + "functional API traced method from mono" | "/greet-mono-from-callable/4" | "/greet-mono-from-callable/{id}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " 4" + "functional API traced method" | "/greet-traced-method/5" | "/greet-traced-method/{id}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " 5" + "functional API traced method with delay" | "/greet-delayed-mono/6" | "/greet-delayed-mono/{id}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " 6" + + "annotation API traced method from mono" | "/foo-mono-from-callable/7" | "/foo-mono-from-callable/{id}" | "getMonoFromCallable" | new FooModel(7L, "tracedMethod").toString() + "annotation API traced method" | "/foo-traced-method/8" | "/foo-traced-method/{id}" | "getTracedMethod" | new FooModel(8L, "tracedMethod").toString() + "annotation API traced method with delay" | "/foo-delayed-mono/9" | "/foo-delayed-mono/{id}" | "getFooDelayedMono" | new FooModel(9L, "tracedMethod").toString() + } + def "404 GET test"() { setup: String url = "http://localhost:$port/notfoundgreet" diff --git a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/SpringWebFluxTestApplication.groovy b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/SpringWebFluxTestApplication.groovy index 6148ae6255..77ed2dd688 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/SpringWebFluxTestApplication.groovy +++ b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/SpringWebFluxTestApplication.groovy @@ -1,5 +1,6 @@ package dd.trace.instrumentation.springwebflux +import datadog.trace.api.Trace import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.context.annotation.Bean import org.springframework.http.MediaType @@ -9,7 +10,6 @@ import org.springframework.web.reactive.function.server.HandlerFunction import org.springframework.web.reactive.function.server.RouterFunction import org.springframework.web.reactive.function.server.ServerRequest import org.springframework.web.reactive.function.server.ServerResponse -import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.time.Duration @@ -63,6 +63,23 @@ class SpringWebFluxTestApplication { Mono handle(ServerRequest request) { return Mono.error(new RuntimeException("bad things happen")) } + }).andRoute(GET("/greet-traced-method/{id}"), new HandlerFunction() { + @Override + Mono handle(ServerRequest request) { + return greetingHandler.intResponse(Mono.just(tracedMethod(request.pathVariable("id").toInteger()))) + } + }).andRoute(GET("/greet-mono-from-callable/{id}"), new HandlerFunction() { + @Override + Mono handle(ServerRequest request) { + return greetingHandler.intResponse(Mono.fromCallable { + return tracedMethod(request.pathVariable("id").toInteger()) + }) + } + }).andRoute(GET("/greet-delayed-mono/{id}"), new HandlerFunction() { + @Override + Mono handle(ServerRequest request) { + return greetingHandler.intResponse(Mono.just(request.pathVariable("id").toInteger()).delayElement(Duration.ofMillis(100)).map { i -> tracedMethod(i) }) + } }) } @@ -90,11 +107,15 @@ class SpringWebFluxTestApplication { .body(BodyInserters.fromObject(DEFAULT_RESPONSE + " " + request.pathVariable("name") + " " + request.pathVariable("word"))) } - Mono counterGreet(ServerRequest request) { - final int countTo = Integer.valueOf(request.pathVariable("count")) - FooModel[] fooArray = FooModel.createXFooModels(countTo) - return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON) - .body(Flux.fromArray(fooArray), FooModel) + Mono intResponse(Mono mono) { + return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN) + .body(BodyInserters.fromPublisher(mono.map { i -> DEFAULT_RESPONSE + " " + i.id }, String)) + } } + + @Trace() + private static FooModel tracedMethod(long id) { + return new FooModel(id, "tracedMethod") + } } diff --git a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/TestController.groovy b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/TestController.groovy index de3f8ae820..766e6a6d42 100644 --- a/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/TestController.groovy +++ b/dd-java-agent/instrumentation/spring-webflux/src/test/groovy/dd/trace/instrumentation/springwebflux/TestController.groovy @@ -1,5 +1,6 @@ package dd.trace.instrumentation.springwebflux +import datadog.trace.api.Trace import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.PathVariable import org.springframework.web.bind.annotation.RestController @@ -39,4 +40,24 @@ class TestController { Mono getFooFailMono(@PathVariable("id") long id) { return Mono.error(new RuntimeException("bad things happen")) } + + @GetMapping("/foo-traced-method/{id}") + Mono getTracedMethod(@PathVariable("id") long id) { + return Mono.just(tracedMethod(id)) + } + + @GetMapping("/foo-mono-from-callable/{id}") + Mono getMonoFromCallable(@PathVariable("id") long id) { + return Mono.fromCallable { return tracedMethod(id) } + } + + @GetMapping("/foo-delayed-mono/{id}") + Mono getFooDelayedMono(@PathVariable("id") long id) { + return Mono.just(id).delayElement(Duration.ofMillis(100)).map { i -> tracedMethod(i) } + } + + @Trace() + private FooModel tracedMethod(long id) { + return new FooModel(id, "tracedMethod") + } }