Add AsyncSpanEndStrategy for Reactor 3.x instrumentation (#2714)

This commit is contained in:
HaloFour 2021-04-19 23:50:20 -04:00 committed by GitHub
parent fa359a4a5d
commit c7e431404e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 627 additions and 1 deletions

View File

@ -13,10 +13,13 @@ dependencies {
implementation project(':instrumentation:reactor-3.1:library')
testLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE'
testImplementation project(':instrumentation:reactor-3.1:testing')
testImplementation deps.opentelemetryExtAnnotations
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}

View File

@ -0,0 +1,248 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.UnicastProcessor
import reactor.test.StepVerifier
class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecification {
def "should capture span for already completed Mono"() {
setup:
def source = Mono.just("Value")
def result = new TracedWithSpan()
.mono(source)
expect:
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Mono"() {
setup:
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
source.onComplete()
verifier.expectNext("Value")
.verifyComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = Mono.error(error)
def result = new TracedWithSpan()
.mono(source)
expect:
StepVerifier.create(result)
.verifyErrorMatches({ it == error })
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
verifier
.verifyErrorMatches({ it == error })
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for already completed Flux"() {
setup:
def source = Flux.just("Value")
def result = new TracedWithSpan()
.flux(source)
expect:
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Flux"() {
setup:
def source = UnicastProcessor.<String>create()
def result = new TracedWithSpan()
.flux(source)
def verifier = StepVerifier.create(result)
.expectSubscription()
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
source.onComplete()
verifier.expectNext("Value")
.verifyComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Flux"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = Flux.error(error)
def result = new TracedWithSpan()
.flux(source)
expect:
StepVerifier.create(result)
.verifyErrorMatches({ it == error })
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Flux"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def result = new TracedWithSpan()
.flux(source)
def verifier = StepVerifier.create(result)
.expectSubscription()
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
verifier.verifyErrorMatches({ it == error })
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.reactor;
import io.opentelemetry.extension.annotations.WithSpan;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TracedWithSpan {
@WithSpan
public Mono<String> mono(Mono<String> mono) {
return mono;
}
@WithSpan
public Flux<String> flux(Flux<String> flux) {
return flux;
}
}

View File

@ -2,10 +2,11 @@ apply from: "$rootDir/gradle/instrumentation-library.gradle"
dependencies {
library group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE'
testImplementation project(':instrumentation:reactor-3.1:testing')
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}

View File

@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.reactor;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
INSTANCE;
@Override
public boolean supports(Class<?> returnType) {
return returnType == Publisher.class || returnType == Mono.class || returnType == Flux.class;
}
@Override
public Object end(BaseTracer tracer, Context context, Object returnValue) {
EndOnFirstNotificationConsumer notificationConsumer =
new EndOnFirstNotificationConsumer(tracer, context);
if (returnValue instanceof Mono) {
Mono<?> mono = (Mono<?>) returnValue;
return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess);
} else {
Flux<?> flux = Flux.from((Publisher<?>) returnValue);
return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer);
}
}
/**
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
* OnError notifications are received. Multiple notifications can happen anytime multiple
* subscribers subscribe to the same publisher.
*/
private static final class EndOnFirstNotificationConsumer extends AtomicBoolean
implements Runnable, Consumer<Throwable> {
private final BaseTracer tracer;
private final Context context;
public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) {
super(false);
this.tracer = tracer;
this.context = context;
}
public <T> void onSuccess(T ignored) {
accept(null);
}
@Override
public void run() {
accept(null);
}
@Override
public void accept(Throwable exception) {
if (compareAndSet(false, true)) {
if (exception != null) {
tracer.endExceptionally(context, exception);
} else {
tracer.end(context);
}
}
}
}
}

View File

@ -23,6 +23,7 @@
package io.opentelemetry.instrumentation.reactor;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
@ -43,11 +44,13 @@ public class TracingOperator {
*/
public static void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift());
AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
}
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public static void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
}
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {

View File

@ -0,0 +1,273 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.reactor
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.api.tracer.BaseTracer
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.UnicastProcessor
import reactor.test.StepVerifier
import spock.lang.Specification
class ReactorAsyncSpanEndStrategyTest extends Specification {
BaseTracer tracer
Context context
def underTest = ReactorAsyncSpanEndStrategy.INSTANCE
void setup() {
tracer = Mock()
context = Mock()
}
static class MonoTest extends ReactorAsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Mono)
}
def "ends span on already completed"() {
when:
def result = (Mono<?>) underTest.end(tracer, context, Mono.just("Value"))
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span on already empty"() {
when:
def result = (Mono<?>) underTest.end(tracer, context, Mono.empty())
StepVerifier.create(result)
.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
when:
def result = (Mono<?>) underTest.end(tracer, context, Mono.error(exception))
StepVerifier.create(result)
.verifyErrorMatches({ it == exception })
then:
1 * tracer.endExceptionally(context, exception)
}
def "ends span when completed"() {
given:
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
when:
def result = (Mono<?>) underTest.end(tracer, context, mono)
def verifier = StepVerifier.create(result)
.expectSubscription()
then:
0 * tracer._
when:
source.onNext("Value")
source.onComplete()
verifier.expectNext("Value")
.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span when empty"() {
given:
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
when:
def result = (Mono<?>) underTest.end(tracer, context, mono)
def verifier = StepVerifier.create(result)
.expectSubscription()
then:
0 * tracer._
when:
source.onComplete()
verifier.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
when:
def result = (Mono<?>) underTest.end(tracer, context, mono)
def verifier = StepVerifier.create(result)
.expectSubscription()
then:
0 * tracer._
when:
source.onError(exception)
verifier.verifyErrorMatches({ it == exception })
then:
1 * tracer.endExceptionally(context, exception)
}
def "ends span once for multiple subscribers"() {
when:
def result = (Mono<?>) underTest.end(tracer, context, Mono.just("Value"))
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
then:
1 * tracer.end(context)
}
}
static class FluxTest extends ReactorAsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Flux)
}
def "ends span on already completed"() {
when:
def result = (Flux<?>) underTest.end(tracer, context, Flux.just("Value"))
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span on already empty"() {
when:
def result = (Flux<?>) underTest.end(tracer, context, Flux.empty())
StepVerifier.create(result)
.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
when:
def result = (Flux<?>) underTest.end(tracer, context, Flux.error(exception))
StepVerifier.create(result)
.verifyErrorMatches({ it == exception })
then:
1 * tracer.endExceptionally(context, exception)
}
def "ends span when completed"() {
given:
def source = UnicastProcessor.<String>create()
when:
def result = (Flux<?>) underTest.end(tracer, context, source)
def verifier = StepVerifier.create(result)
.expectSubscription()
then:
0 * tracer._
when:
source.onNext("Value")
source.onComplete()
verifier.expectNext("Value")
.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span when empty"() {
given:
def source = UnicastProcessor.<String>create()
when:
def result = (Flux<?>) underTest.end(tracer, context, source)
def verifier = StepVerifier.create(result)
.expectSubscription()
then:
0 * tracer._
when:
source.onComplete()
verifier.verifyComplete()
then:
1 * tracer.end(context)
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = UnicastProcessor.<String>create()
when:
def result = (Flux<?>) underTest.end(tracer, context, source)
def verifier = StepVerifier.create(result)
.expectSubscription()
then:
0 * tracer._
when:
source.onError(exception)
verifier.verifyErrorMatches({ it == exception })
then:
1 * tracer.endExceptionally(context, exception)
}
def "ends span once for multiple subscribers"() {
when:
def result = (Flux<?>) underTest.end(tracer, context, Flux.just("Value"))
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()
then:
1 * tracer.end(context)
}
}
}