Add reactor-core instrumentation

This commit is contained in:
Nikolay Martynov 2019-02-11 16:10:14 -05:00
parent 58e4126fee
commit 13005d72cb
6 changed files with 524 additions and 0 deletions

View File

@ -0,0 +1,81 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
maxJavaVersionForTests = JavaVersion.VERSION_1_8
}
muzzle {
pass {
group = "io.projectreactor"
module = "reactor-core"
versions = "[3.1.0.RELEASE,)"
assertInverse = true
}
}
apply from: "${rootDir}/gradle/java.gradle"
sourceSets {
main_java8 {
java.srcDirs "${project.projectDir}/src/main/java8"
}
}
compileMain_java8Java {
sourceCompatibility = 1.8
targetCompatibility = 1.8
}
// Note: ideally lombok plugin would do this for us, but currently it doesn't support custom
// source sets. See https://github.com/franzbecker/gradle-lombok/issues/17.
dependencies {
main_java8CompileOnly "org.projectlombok:lombok:${project.lombok.version}" transitive false
main_java8AnnotationProcessor "org.projectlombok:lombok:${project.lombok.version}" transitive false
}
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
compileTestJava {
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
}
compileLatestDepTestJava {
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
}
dependencies {
main_java8CompileOnly group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
main_java8Compile project(':dd-java-agent:agent-tooling')
main_java8Compile deps.bytebuddy
main_java8Compile deps.opentracing
compileOnly sourceSets.main_java8.compileClasspath
compile sourceSets.main_java8.output
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
latestDepTestCompile group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestCompile group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}

View File

@ -0,0 +1,55 @@
package datadog.trace.instrumentation.reactor.core;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class FluxAndMonoInstrumentation extends Instrumenter.Default {
public static final String PACKAGE = FluxAndMonoInstrumentation.class.getPackage().getName();
public FluxAndMonoInstrumentation() {
super("reactor-core");
}
@Override
public String[] helperClassNames() {
return new String[] {
PACKAGE + ".ReactorCoreAdviceUtils", PACKAGE + ".ReactorCoreAdviceUtils$TracingSubscriber"
};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isAbstract())
.and(
safeHasSuperType(
named("reactor.core.publisher.Mono").or(named("reactor.core.publisher.Flux"))));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isPublic())
.and(named("subscribe"))
.and(takesArgument(0, named("reactor.core.CoreSubscriber")))
.and(takesArguments(1)),
// Cannot reference class directly here because it would lead to class load failure on Java7
PACKAGE + ".FluxAndMonoSubscribeAdvice");
}
}

View File

@ -0,0 +1,45 @@
package datadog.trace.instrumentation.reactor.core;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import net.bytebuddy.asm.Advice;
import reactor.core.CoreSubscriber;
/**
* Instruments Flux#subscribe(CoreSubscriber) and Mono#subscribe(CoreSubscriber). It looks like Mono
* and Flux implementations tend to do a lot of interesting work on subscription.
*
* <p>This instrumentation is similar to java-concurrent instrumentation in a sense that it doesn't
* create any new spans. Instead it makes sure that existing span is propagated through Flux/Mono
* execution.
*/
public class FluxAndMonoSubscribeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
@Advice.Argument(0) final CoreSubscriber subscriber, @Advice.This final Object thiz) {
final Span span =
subscriber
.currentContext()
.getOrDefault(ReactorCoreAdviceUtils.PUBLISHER_CONTEXT_KEY, null);
if (span != null) {
final Scope scope = GlobalTracer.get().scopeManager().activate(span, false);
((TraceScope) scope).setAsyncPropagation(true);
return scope;
}
return null;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
if (throwable != null) {
ReactorCoreAdviceUtils.finishSpanIfPresent(scope.span(), throwable);
}
if (scope != null) {
scope.close();
}
}
}

View File

@ -0,0 +1,95 @@
package datadog.trace.instrumentation.reactor.core;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static reactor.core.publisher.Operators.lift;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
@Slf4j
public class ReactorCoreAdviceUtils {
public static final String PUBLISHER_CONTEXT_KEY =
"datadog.trace.instrumentation.reactor.core.Span";
public static <T> Mono<T> setPublisherSpan(final Mono<T> mono, final Span span) {
return mono.<T>transform(finishSpanNextOrError())
.subscriberContext(Context.of(PUBLISHER_CONTEXT_KEY, span));
}
public static <T> Flux<T> setPublisherSpan(final Flux<T> flux, final Span span) {
return flux.<T>transform(finishSpanNextOrError())
.subscriberContext(Context.of(PUBLISHER_CONTEXT_KEY, span));
}
/**
* Idea for this has been lifted from https://github.com/reactor/reactor-core/issues/947. Newer
* versions of reactor-core have easier way to access context but we want to support older
* versions.
*/
public static <T, IP>
Function<? super Publisher<T>, ? extends Publisher<T>> finishSpanNextOrError() {
return lift((scannable, subscriber) -> new TracingSubscriber<>(subscriber));
}
public static void finishSpanIfPresent(final Context context, final Throwable throwable) {
finishSpanIfPresent(context.getOrDefault(PUBLISHER_CONTEXT_KEY, (Span) null), throwable);
}
public static void finishSpanIfPresent(final Span span, final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
span.finish();
}
}
public static class TracingSubscriber<T> implements CoreSubscriber<T> {
private final Context context;
private final CoreSubscriber<? super T> subscriber;
public TracingSubscriber(final CoreSubscriber<? super T> subscriber) {
this.subscriber = subscriber;
context = subscriber.currentContext();
}
@Override
public void onNext(final T event) {
subscriber.onNext(event);
}
@Override
public void onError(final Throwable throwable) {
finishSpanIfPresent(context, throwable);
subscriber.onError(throwable);
}
@Override
public void onComplete() {
finishSpanIfPresent(context, null);
subscriber.onComplete();
}
@Override
public Context currentContext() {
return context;
}
@Override
public void onSubscribe(final Subscription s) {
subscriber.onSubscribe(s);
}
}
}

View File

@ -0,0 +1,247 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils
import io.opentracing.Scope
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import spock.lang.Shared
import java.time.Duration
class ReactorCoreTest extends AgentTestRunner {
public static final String EXCEPTION_MESSAGE = "test exception"
@Shared
def addOne = { i -> addOneFunc(i) }
@Shared
def throwException = { throw new RuntimeException(EXCEPTION_MESSAGE) }
def "Publisher '#name' test"() {
when:
def result = runUnderTrace(publisher)
then:
result == expected
and:
assertTraces(1) {
def publisherParentSpanIndex = workSpans + 1
trace(0, workSpans + 2) {
span(0) {
resourceName "trace-parent"
operationName "trace-parent"
parent()
tags {
"$Tags.COMPONENT.key" "trace"
defaultTags()
}
}
for (int i = 0; i < workSpans; i++) {
span(i + 1) {
resourceName "addOne"
operationName "addOne"
childOf(span(publisherParentSpanIndex))
tags {
"$Tags.COMPONENT.key" "trace"
defaultTags()
}
}
}
span(publisherParentSpanIndex) {
resourceName "publisher-parent"
operationName "publisher-parent"
childOf(span(0))
tags {
defaultTags()
}
}
}
}
where:
name | expected | workSpans | publisher
"basic mono" | 2 | 1 | Mono.just(1).map(addOne)
"two operations mono" | 4 | 2 | Mono.just(2).map(addOne).map(addOne)
"delayed mono" | 4 | 1 | Mono.just(3).delayElement(Duration.ofMillis(100)).map(addOne)
"delayed twice mono" | 6 | 2 | Mono.just(4).delayElement(Duration.ofMillis(100)).map(addOne).delayElement(Duration.ofMillis(100)).map(addOne)
"basic flux" | [6, 7] | 2 | Flux.fromIterable([5, 6]).map(addOne)
"two operations flux" | [8, 9] | 4 | Flux.fromIterable([6, 7]).map(addOne).map(addOne)
"delayed flux" | [8, 9] | 2 | Flux.fromIterable([7, 8]).delayElements(Duration.ofMillis(100)).map(addOne)
"delayed twice flux" | [10, 11] | 4 | Flux.fromIterable([8, 9]).delayElements(Duration.ofMillis(100)).map(addOne).delayElements(Duration.ofMillis(100)).map(addOne)
"mono from callable" | 12 | 2 | Mono.fromCallable({ addOneFunc(10) }).map(addOne)
}
def "Publisher error '#name' test"() {
when:
runUnderTrace(publisher)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName "trace-parent"
operationName "trace-parent"
parent()
errored true
tags {
"$Tags.COMPONENT.key" "trace"
errorTags(RuntimeException, EXCEPTION_MESSAGE)
defaultTags()
}
}
span(1) {
resourceName "publisher-parent"
operationName "publisher-parent"
childOf(span(0))
errored true
tags {
errorTags(RuntimeException, EXCEPTION_MESSAGE)
defaultTags()
}
}
}
}
where:
name | publisher
"mono" | Mono.error(new RuntimeException(EXCEPTION_MESSAGE))
"flux" | Flux.error(new RuntimeException(EXCEPTION_MESSAGE))
}
def "Publisher step '#name' test"() {
when:
runUnderTrace(publisher)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
trace(0, workSpans + 2) {
def publisherParentSpanIndex = workSpans + 1
span(0) {
resourceName "trace-parent"
operationName "trace-parent"
parent()
errored true
tags {
"$Tags.COMPONENT.key" "trace"
errorTags(RuntimeException, EXCEPTION_MESSAGE)
defaultTags()
}
}
for (int i = 0; i < workSpans; i++) {
span(i + 1) {
resourceName "addOne"
operationName "addOne"
childOf(span(publisherParentSpanIndex))
tags {
"$Tags.COMPONENT.key" "trace"
defaultTags()
}
}
}
span(publisherParentSpanIndex) {
resourceName "publisher-parent"
operationName "publisher-parent"
childOf(span(0))
errored true
tags {
errorTags(RuntimeException, EXCEPTION_MESSAGE)
defaultTags()
}
}
}
}
where:
name | workSpans | publisher
"basic mono failure" | 1 | Mono.just(1).map(addOne).map({ throwException() })
"basic flux failure" | 1 | Flux.fromIterable([5, 6]).map(addOne).map({ throwException() })
}
def "Publisher '#name' cancel"() {
when:
cancelUnderTrace(publisher)
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
resourceName "trace-parent"
operationName "trace-parent"
parent()
tags {
"$Tags.COMPONENT.key" "trace"
defaultTags()
}
}
span(1) {
resourceName "publisher-parent"
operationName "publisher-parent"
childOf(span(0))
tags {
defaultTags()
}
}
}
}
where:
name | publisher
"basic mono" | Mono.just(1)
"basic flux" | Flux.fromIterable([5, 6])
}
@Trace(operationName = "trace-parent")
def runUnderTrace(def publisher) {
// This is important sequence of events:
// We have a 'trace-parent' that covers whole span and then we have publisher-parent that overs only
// operation to create publisher (and set its context).
// The expectation is that then publisher is executed under 'publisher-parent', not under 'trace-parent'
final Scope scope = GlobalTracer.get().buildSpan("publisher-parent").startActive(true)
publisher = ReactorCoreAdviceUtils.setPublisherSpan(publisher, scope.span())
scope.close()
// Read all data from publisher
if (publisher instanceof Mono) {
return publisher.block()
} else if (publisher instanceof Flux) {
return publisher.toStream().toArray({ size -> new Integer[size] })
}
throw new RuntimeException("Unknown publisher: " + publisher)
}
@Trace(operationName = "trace-parent")
def cancelUnderTrace(def publisher) {
final Scope scope = GlobalTracer.get().buildSpan("publisher-parent").startActive(true)
publisher = ReactorCoreAdviceUtils.setPublisherSpan(publisher, scope.span())
scope.close()
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {
subscription.cancel()
}
void onNext(Integer t) {}
void onError(Throwable error) {}
void onComplete() {}
})
}
@Trace(operationName = "addOne")
def static addOneFunc(int i) {
return i + 1
}
}

View File

@ -68,6 +68,7 @@ include ':dd-java-agent:instrumentation:osgi-classloading'
include ':dd-java-agent:instrumentation:play-2.4'
include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7'
include ':dd-java-agent:instrumentation:ratpack-1.4'
include ':dd-java-agent:instrumentation:reactor-core-3.1'
include ':dd-java-agent:instrumentation:servlet-2'
include ':dd-java-agent:instrumentation:servlet-3'
include ':dd-java-agent:instrumentation:slf4j-mdc'