From c7e431404eb02a2f6b016d817bc0efa64809022a Mon Sep 17 00:00:00 2001 From: HaloFour Date: Mon, 19 Apr 2021 23:50:20 -0400 Subject: [PATCH] Add AsyncSpanEndStrategy for Reactor 3.x instrumentation (#2714) --- .../javaagent/reactor-3.1-javaagent.gradle | 3 + .../ReactorWithSpanInstrumentationTest.groovy | 248 ++++++++++++++++ .../reactor/TracedWithSpan.java | 22 ++ .../library/reactor-3.1-library.gradle | 3 +- .../reactor/ReactorAsyncSpanEndStrategy.java | 76 +++++ .../reactor/TracingOperator.java | 3 + .../ReactorAsyncSpanEndStrategyTest.groovy | 273 ++++++++++++++++++ 7 files changed, 627 insertions(+), 1 deletion(-) create mode 100644 instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy create mode 100644 instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java create mode 100644 instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java create mode 100644 instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy diff --git a/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle b/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle index f147ad4a5c..e19cdd440e 100644 --- a/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle +++ b/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle @@ -13,10 +13,13 @@ dependencies { implementation project(':instrumentation:reactor-3.1:library') testLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE' + testLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE' testImplementation project(':instrumentation:reactor-3.1:testing') + testImplementation deps.opentelemetryExtAnnotations latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+' + latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.+' // Looks like later versions on reactor need this dependency for some reason even though it is marked as optional. latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+' } diff --git a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy new file mode 100644 index 0000000000..370019b796 --- /dev/null +++ b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy @@ -0,0 +1,248 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.instrumentation.reactor.TracedWithSpan +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.UnicastProcessor +import reactor.test.StepVerifier + +class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecification { + + def "should capture span for already completed Mono"() { + setup: + def source = Mono.just("Value") + def result = new TracedWithSpan() + .mono(source) + + expect: + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Mono"() { + setup: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def result = new TracedWithSpan() + .mono(mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + source.onComplete() + + verifier.expectNext("Value") + .verifyComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Mono"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = Mono.error(error) + def result = new TracedWithSpan() + .mono(source) + + expect: + StepVerifier.create(result) + .verifyErrorMatches({ it == error }) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Mono"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def result = new TracedWithSpan() + .mono(mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + + verifier + .verifyErrorMatches({ it == error }) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for already completed Flux"() { + setup: + def source = Flux.just("Value") + def result = new TracedWithSpan() + .flux(source) + + expect: + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Flux"() { + setup: + def source = UnicastProcessor.create() + def result = new TracedWithSpan() + .flux(source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + source.onComplete() + + verifier.expectNext("Value") + .verifyComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Flux"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = Flux.error(error) + def result = new TracedWithSpan() + .flux(source) + + expect: + StepVerifier.create(result) + .verifyErrorMatches({ it == error }) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Flux"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor.create() + def result = new TracedWithSpan() + .flux(source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + + verifier.verifyErrorMatches({ it == error }) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } +} diff --git a/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java b/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java new file mode 100644 index 0000000000..e9cdb70394 --- /dev/null +++ b/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.extension.annotations.WithSpan; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class TracedWithSpan { + @WithSpan + public Mono mono(Mono mono) { + return mono; + } + + @WithSpan + public Flux flux(Flux flux) { + return flux; + } +} diff --git a/instrumentation/reactor-3.1/library/reactor-3.1-library.gradle b/instrumentation/reactor-3.1/library/reactor-3.1-library.gradle index 8fee34c420..38e01aa6e2 100644 --- a/instrumentation/reactor-3.1/library/reactor-3.1-library.gradle +++ b/instrumentation/reactor-3.1/library/reactor-3.1-library.gradle @@ -2,10 +2,11 @@ apply from: "$rootDir/gradle/instrumentation-library.gradle" dependencies { library group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE' + testLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE' testImplementation project(':instrumentation:reactor-3.1:testing') - latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+' + latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.+' // Looks like later versions on reactor need this dependency for some reason even though it is marked as optional. latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+' } \ No newline at end of file diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java new file mode 100644 index 0000000000..133a9f2ea6 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy { + INSTANCE; + + @Override + public boolean supports(Class returnType) { + return returnType == Publisher.class || returnType == Mono.class || returnType == Flux.class; + } + + @Override + public Object end(BaseTracer tracer, Context context, Object returnValue) { + + EndOnFirstNotificationConsumer notificationConsumer = + new EndOnFirstNotificationConsumer(tracer, context); + if (returnValue instanceof Mono) { + Mono mono = (Mono) returnValue; + return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess); + } else { + Flux flux = Flux.from((Publisher) returnValue); + return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer); + } + } + + /** + * Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or + * OnError notifications are received. Multiple notifications can happen anytime multiple + * subscribers subscribe to the same publisher. + */ + private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + implements Runnable, Consumer { + + private final BaseTracer tracer; + private final Context context; + + public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { + super(false); + this.tracer = tracer; + this.context = context; + } + + public void onSuccess(T ignored) { + accept(null); + } + + @Override + public void run() { + accept(null); + } + + @Override + public void accept(Throwable exception) { + if (compareAndSet(false, true)) { + if (exception != null) { + tracer.endExceptionally(context, exception); + } else { + tracer.end(context); + } + } + } + } +} diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index 786bc99729..53dc2bf3af 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -23,6 +23,7 @@ package io.opentelemetry.instrumentation.reactor; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import java.util.function.BiFunction; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -43,11 +44,13 @@ public class TracingOperator { */ public static void registerOnEachOperator() { Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift()); + AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE); } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ public static void resetOnEachOperator() { Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE); } private static Function, ? extends Publisher> tracingLift() { diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy new file mode 100644 index 0000000000..dadbc2abf8 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy @@ -0,0 +1,273 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor + +import io.opentelemetry.context.Context +import io.opentelemetry.instrumentation.api.tracer.BaseTracer +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.UnicastProcessor +import reactor.test.StepVerifier +import spock.lang.Specification + +class ReactorAsyncSpanEndStrategyTest extends Specification { + BaseTracer tracer + + Context context + + def underTest = ReactorAsyncSpanEndStrategy.INSTANCE + + void setup() { + tracer = Mock() + context = Mock() + } + + static class MonoTest extends ReactorAsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Mono) + } + + def "ends span on already completed"() { + when: + def result = (Mono) underTest.end(tracer, context, Mono.just("Value")) + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span on already empty"() { + when: + def result = (Mono) underTest.end(tracer, context, Mono.empty()) + StepVerifier.create(result) + .verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + + when: + def result = (Mono) underTest.end(tracer, context, Mono.error(exception)) + StepVerifier.create(result) + .verifyErrorMatches({ it == exception }) + + then: + 1 * tracer.endExceptionally(context, exception) + } + + def "ends span when completed"() { + given: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + + when: + def result = (Mono) underTest.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + source.onNext("Value") + source.onComplete() + verifier.expectNext("Value") + .verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span when empty"() { + given: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + + when: + def result = (Mono) underTest.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + source.onComplete() + verifier.verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + + when: + def result = (Mono) underTest.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + source.onError(exception) + verifier.verifyErrorMatches({ it == exception }) + + then: + 1 * tracer.endExceptionally(context, exception) + } + + def "ends span once for multiple subscribers"() { + + when: + def result = (Mono) underTest.end(tracer, context, Mono.just("Value")) + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + then: + 1 * tracer.end(context) + } + } + + static class FluxTest extends ReactorAsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Flux) + } + + def "ends span on already completed"() { + when: + def result = (Flux) underTest.end(tracer, context, Flux.just("Value")) + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span on already empty"() { + when: + def result = (Flux) underTest.end(tracer, context, Flux.empty()) + StepVerifier.create(result) + .verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + + when: + def result = (Flux) underTest.end(tracer, context, Flux.error(exception)) + StepVerifier.create(result) + .verifyErrorMatches({ it == exception }) + + then: + 1 * tracer.endExceptionally(context, exception) + } + + def "ends span when completed"() { + given: + def source = UnicastProcessor.create() + + when: + def result = (Flux) underTest.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + source.onNext("Value") + source.onComplete() + verifier.expectNext("Value") + .verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span when empty"() { + given: + def source = UnicastProcessor.create() + + when: + def result = (Flux) underTest.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + source.onComplete() + verifier.verifyComplete() + + then: + 1 * tracer.end(context) + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = UnicastProcessor.create() + + when: + def result = (Flux) underTest.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + source.onError(exception) + verifier.verifyErrorMatches({ it == exception }) + + then: + 1 * tracer.endExceptionally(context, exception) + } + + def "ends span once for multiple subscribers"() { + when: + def result = (Flux) underTest.end(tracer, context, Flux.just("Value")) + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + then: + 1 * tracer.end(context) + } + } +}