Improve webflux integration

Add support for tracing Mono/Flux
This commit is contained in:
Nikolay Martynov 2019-02-12 10:52:49 -05:00
parent 13005d72cb
commit 8e1382b4e4
9 changed files with 171 additions and 78 deletions

View File

@ -25,6 +25,12 @@ compileMain_java8Java {
sourceCompatibility = 1.8
targetCompatibility = 1.8
}
// Note: ideally lombok plugin would do this for us, but currently it doesn't support custom
// source sets. See https://github.com/franzbecker/gradle-lombok/issues/17.
dependencies {
main_java8CompileOnly "org.projectlombok:lombok:${project.lombok.version}" transitive false
main_java8AnnotationProcessor "org.projectlombok:lombok:${project.lombok.version}" transitive false
}
apply plugin: 'org.unbroken-dome.test-sets'
@ -44,7 +50,8 @@ compileLatestDepTestJava {
}
dependencies {
// We use helpers from this project
main_java8CompileOnly project(':dd-java-agent:instrumentation:reactor-core-3.1')
main_java8CompileOnly group: 'org.springframework', name: 'spring-webflux', version: '5.0.0.RELEASE'
main_java8Compile project(':dd-java-agent:agent-tooling')
@ -53,12 +60,12 @@ dependencies {
main_java8Compile deps.opentracing
compileOnly sourceSets.main_java8.compileClasspath
compile sourceSets.main_java8.output
compileOnly group: 'org.springframework', name: 'spring-webflux', version: '5.0.0.RELEASE'
compile project(':dd-java-agent:agent-tooling')
// We are using utils class from reactor-core instrumentation.
// TODO: It is unclear why we need to use `compile` here (instead of 'compileOnly')
compile project(':dd-java-agent:instrumentation:reactor-core-3.1')
compile deps.bytebuddy
compile deps.opentracing
@ -66,9 +73,10 @@ dependencies {
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile project(':dd-java-agent:instrumentation:netty-4.1')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:reactor-core-3.1')
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.0.0.RELEASE'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: '2.0.0.RELEASE'

View File

@ -13,9 +13,10 @@ public abstract class AbstractWebfluxInstrumentation extends Instrumenter.Defaul
@Override
public String[] helperClassNames() {
return new String[] {
// Some code comes from reactor's instrumentation's helper
"datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils",
"datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils$TracingSubscriber",
PACKAGE + ".AdviceUtils",
PACKAGE + ".DispatcherHandlerOnSuccessOrError",
PACKAGE + ".DispatcherHandlerOnCancel",
PACKAGE + ".RouteOnSuccessOrError"
};
}

View File

@ -1,18 +1,17 @@
package datadog.trace.instrumentation.springwebflux;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
@Slf4j
public class AdviceUtils {
public static final String SPAN_ATTRIBUTE = "datadog.trace.instrumentation.springwebflux.Span";
public static final String PARENT_SPAN_ATTRIBUTE =
"datadog.trace.instrumentation.springwebflux.ParentSpan";
"datadog.trace.instrumentation.springwebflux.ParentSpan";
public static String parseOperationName(final Object handler) {
final String className = parseClassName(handler.getClass());
@ -43,23 +42,13 @@ public class AdviceUtils {
public static void finishSpanIfPresent(
final ServerWebExchange exchange, final Throwable throwable) {
// Span could have been removed and finished by other thread before we got here
finishSpanIfPresent((Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable);
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) exchange.getAttributes().remove(SPAN_ATTRIBUTE), throwable);
}
public static void finishSpanIfPresent(
final ServerRequest serverRequest, final Throwable throwable) {
// Span could have been removed and finished by other thread before we got here
finishSpanIfPresent((Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
private static void finishSpanIfPresent(final Span span, final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
span.finish();
}
ReactorCoreAdviceUtils.finishSpanIfPresent(
(Span) serverRequest.attributes().remove(SPAN_ATTRIBUTE), throwable);
}
}

View File

@ -3,11 +3,14 @@ package datadog.trace.instrumentation.springwebflux;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.function.Function;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@ -42,12 +45,11 @@ public class DispatcherHandlerAdvice {
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable,
@Advice.Argument(0) final ServerWebExchange exchange,
@Advice.Return(readOnly = false) Mono<?> returnMono) {
if (throwable == null && returnMono != null) {
returnMono =
returnMono
.doOnSuccessOrError(new DispatcherHandlerOnSuccessOrError<>(exchange))
.doOnCancel(new DispatcherHandlerOnCancel(exchange));
@Advice.Return(readOnly = false) Mono<Object> mono) {
if (throwable == null && mono != null) {
final Function<? super Mono<Object>, ? extends Publisher<Object>> function =
ReactorCoreAdviceUtils.finishSpanNextOrError();
mono = ReactorCoreAdviceUtils.setPublisherSpan(mono, scope.span());
} else if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}

View File

@ -1,18 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import org.springframework.web.server.ServerWebExchange;
public class DispatcherHandlerOnCancel implements Runnable {
private final ServerWebExchange exchange;
public DispatcherHandlerOnCancel(final ServerWebExchange exchange) {
this.exchange = exchange;
}
@Override
public void run() {
// Make sure we are not leaking opened spans for canceled Monos.
AdviceUtils.finishSpanIfPresent(exchange, null);
}
}

View File

@ -1,21 +0,0 @@
package datadog.trace.instrumentation.springwebflux;
import java.util.function.BiConsumer;
import org.springframework.web.server.ServerWebExchange;
public class DispatcherHandlerOnSuccessOrError<U> implements BiConsumer<U, Throwable> {
private final ServerWebExchange exchange;
public DispatcherHandlerOnSuccessOrError(final ServerWebExchange exchange) {
this.exchange = exchange;
}
@Override
public void accept(final U object, final Throwable throwable) {
// Closing span here means it closes after Netty span which may not be ideal.
// We could instrument HandlerFunctionAdapter instead, but this would mean we
// would not account for time spent sending request.
AdviceUtils.finishSpanIfPresent(exchange, throwable);
}
}

View File

@ -17,7 +17,6 @@ import org.springframework.boot.web.server.LocalServerPort
import org.springframework.context.annotation.Bean
import org.springframework.web.server.ResponseStatusException
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [SpringWebFluxTestApplication, ForceNettyAutoConfiguration])
class SpringWebfluxTest extends AgentTestRunner {
@ -111,6 +110,97 @@ class SpringWebfluxTest extends AgentTestRunner {
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
}
def "GET test with async response #testName"() {
setup:
String url = "http://localhost:$port$urlPath"
def request = new Request.Builder().url(url).get().build()
when:
def response = client.newCall(request).execute()
then:
response.code == 200
response.body().string() == expectedResponseBody
assertTraces(1) {
println TEST_WRITER
trace(0, 3) {
span(0) {
if (annotatedMethod == null) {
// Functional API
resourceNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
operationNameContains(SPRING_APP_CLASS_ANON_NESTED_CLASS_PREFIX, ".handle")
} else {
// Annotation API
resourceName TestController.getSimpleName() + "." + annotatedMethod
operationName TestController.getSimpleName() + "." + annotatedMethod
}
spanType DDSpanTypes.HTTP_SERVER
childOf(span(1))
tags {
"$Tags.COMPONENT.key" "spring-webflux-controller"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
if (annotatedMethod == null) {
// Functional API
"request.predicate" "(GET && $urlPathWithVariables)"
"handler.type" { String tagVal ->
return tagVal.contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX)
}
} else {
// Annotation API
"handler.type" TestController.getName()
}
defaultTags()
}
}
span(1) {
resourceName "GET $urlPathWithVariables"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
parent()
tags {
"$Tags.COMPONENT.key" "netty"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" Integer
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_STATUS.key" 200
"$Tags.HTTP_URL.key" url
defaultTags()
}
}
span(2) {
serviceName "unnamed-java-app"
if (annotatedMethod == null) {
// Functional API
resourceName "SpringWebFluxTestApplication.tracedMethod"
operationName "SpringWebFluxTestApplication.tracedMethod"
} else {
// Annotation API
resourceName "TestController.tracedMethod"
operationName "TestController.tracedMethod"
}
childOf(span(0))
errored false
tags {
"$Tags.COMPONENT.key" "trace"
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_SERVER
defaultTags()
}
}
}
}
where:
testName | urlPath | urlPathWithVariables | annotatedMethod | expectedResponseBody
"functional API traced method from mono" | "/greet-mono-from-callable/4" | "/greet-mono-from-callable/{id}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " 4"
"functional API traced method" | "/greet-traced-method/5" | "/greet-traced-method/{id}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " 5"
"functional API traced method with delay" | "/greet-delayed-mono/6" | "/greet-delayed-mono/{id}" | null | SpringWebFluxTestApplication.GreetingHandler.DEFAULT_RESPONSE + " 6"
"annotation API traced method from mono" | "/foo-mono-from-callable/7" | "/foo-mono-from-callable/{id}" | "getMonoFromCallable" | new FooModel(7L, "tracedMethod").toString()
"annotation API traced method" | "/foo-traced-method/8" | "/foo-traced-method/{id}" | "getTracedMethod" | new FooModel(8L, "tracedMethod").toString()
"annotation API traced method with delay" | "/foo-delayed-mono/9" | "/foo-delayed-mono/{id}" | "getFooDelayedMono" | new FooModel(9L, "tracedMethod").toString()
}
def "404 GET test"() {
setup:
String url = "http://localhost:$port/notfoundgreet"

View File

@ -1,5 +1,6 @@
package dd.trace.instrumentation.springwebflux
import datadog.trace.api.Trace
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.MediaType
@ -9,7 +10,6 @@ import org.springframework.web.reactive.function.server.HandlerFunction
import org.springframework.web.reactive.function.server.RouterFunction
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
@ -63,6 +63,23 @@ class SpringWebFluxTestApplication {
Mono<ServerResponse> handle(ServerRequest request) {
return Mono.error(new RuntimeException("bad things happen"))
}
}).andRoute(GET("/greet-traced-method/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.intResponse(Mono.just(tracedMethod(request.pathVariable("id").toInteger())))
}
}).andRoute(GET("/greet-mono-from-callable/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.intResponse(Mono.fromCallable {
return tracedMethod(request.pathVariable("id").toInteger())
})
}
}).andRoute(GET("/greet-delayed-mono/{id}"), new HandlerFunction<ServerResponse>() {
@Override
Mono<ServerResponse> handle(ServerRequest request) {
return greetingHandler.intResponse(Mono.just(request.pathVariable("id").toInteger()).delayElement(Duration.ofMillis(100)).map { i -> tracedMethod(i) })
}
})
}
@ -90,11 +107,15 @@ class SpringWebFluxTestApplication {
.body(BodyInserters.fromObject(DEFAULT_RESPONSE + " " + request.pathVariable("name") + " " + request.pathVariable("word")))
}
Mono<ServerResponse> counterGreet(ServerRequest request) {
final int countTo = Integer.valueOf(request.pathVariable("count"))
FooModel[] fooArray = FooModel.createXFooModels(countTo)
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(Flux.fromArray(fooArray), FooModel)
Mono<ServerResponse> intResponse(Mono<FooModel> mono) {
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromPublisher(mono.map { i -> DEFAULT_RESPONSE + " " + i.id }, String))
}
}
@Trace()
private static FooModel tracedMethod(long id) {
return new FooModel(id, "tracedMethod")
}
}

View File

@ -1,5 +1,6 @@
package dd.trace.instrumentation.springwebflux
import datadog.trace.api.Trace
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RestController
@ -39,4 +40,24 @@ class TestController {
Mono<FooModel> getFooFailMono(@PathVariable("id") long id) {
return Mono.error(new RuntimeException("bad things happen"))
}
@GetMapping("/foo-traced-method/{id}")
Mono<FooModel> getTracedMethod(@PathVariable("id") long id) {
return Mono.just(tracedMethod(id))
}
@GetMapping("/foo-mono-from-callable/{id}")
Mono<FooModel> getMonoFromCallable(@PathVariable("id") long id) {
return Mono.fromCallable { return tracedMethod(id) }
}
@GetMapping("/foo-delayed-mono/{id}")
Mono<FooModel> getFooDelayedMono(@PathVariable("id") long id) {
return Mono.just(id).delayElement(Duration.ofMillis(100)).map { i -> tracedMethod(i) }
}
@Trace()
private FooModel tracedMethod(long id) {
return new FooModel(id, "tracedMethod")
}
}