From 5ec0937ea7498388da50aac62eaaf441d7bdeb10 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 25 Mar 2022 11:00:39 +0900 Subject: [PATCH] Migrate reactor testing to Java (#5679) --- .../src/test/groovy/ReactorCoreTest.groovy | 10 - .../src/test/groovy/SubscriptionTest.groovy | 10 - .../reactor/ReactorCoreTest.java | 20 + .../reactor/SubscriptionTest.java | 20 + .../reactor/ReactorCoreTest.groovy | 218 ------- .../reactor/SubscriptionTest.groovy | 22 - .../reactor/ReactorCoreTest.java | 225 +++++++ .../reactor/SubscriptionTest.java | 34 ++ .../reactor-3.1/testing/build.gradle.kts | 2 - .../reactor/AbstractReactorCoreTest.groovy | 521 ---------------- .../reactor/AbstractSubscriptionTest.groovy | 58 -- .../reactor/AbstractReactorCoreTest.java | 578 ++++++++++++++++++ .../reactor/AbstractSubscriptionTest.java | 41 ++ 13 files changed, 918 insertions(+), 841 deletions(-) delete mode 100644 instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/ReactorCoreTest.groovy delete mode 100644 instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/SubscriptionTest.groovy create mode 100644 instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java create mode 100644 instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java delete mode 100644 instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy delete mode 100644 instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy create mode 100644 instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java create mode 100644 instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java delete mode 100644 instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.groovy delete mode 100644 instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.groovy create mode 100644 instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.java create mode 100644 instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.java diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/ReactorCoreTest.groovy b/instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/ReactorCoreTest.groovy deleted file mode 100644 index 09136d0a91..0000000000 --- a/instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/ReactorCoreTest.groovy +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.reactor.AbstractReactorCoreTest -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class ReactorCoreTest extends AbstractReactorCoreTest implements AgentTestTrait { -} diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/SubscriptionTest.groovy b/instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/SubscriptionTest.groovy deleted file mode 100644 index db1e22257e..0000000000 --- a/instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/SubscriptionTest.groovy +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.reactor.AbstractSubscriptionTest -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class SubscriptionTest extends AbstractSubscriptionTest implements AgentTestTrait { -} diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java b/instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java new file mode 100644 index 0000000000..05cf088e41 --- /dev/null +++ b/instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ReactorCoreTest extends AbstractReactorCoreTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + ReactorCoreTest() { + super(testing); + } +} diff --git a/instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java b/instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java new file mode 100644 index 0000000000..ef6d6a8a77 --- /dev/null +++ b/instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SubscriptionTest extends AbstractSubscriptionTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + SubscriptionTest() { + super(testing); + } +} diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy b/instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy deleted file mode 100644 index f650eec61d..0000000000 --- a/instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.reactor - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.api.trace.StatusCode -import io.opentelemetry.context.Context -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import spock.lang.Shared - -class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrait { - @Shared - ContextPropagationOperator tracingOperator = ContextPropagationOperator.create() - - def setupSpec() { - tracingOperator.registerOnEachOperator() - } - - def cleanupSpec() { - tracingOperator.resetOnEachOperator() - } - - def "Current in non-blocking publisher assembly"() { - when: - runWithSpan({ - return publisherSupplier().transform({ publisher -> traceNonBlocking(publisher, "inner") }) - }) - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "trace-parent" - hasNoParent() - attributes { - } - } - - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - - span(2) { - name "inner" - childOf span(1) - attributes { - "inner" "foo" - } - } - } - } - - where: - paramName | publisherSupplier - "basic mono" | { -> - Mono.fromCallable({ i -> - Span.current().setAttribute("inner", "foo") - return 1 - }) - } - "basic flux" | { -> - Flux.defer({ - Span.current().setAttribute("inner", "foo") - return Flux.just([5, 6].toArray()) - }) - } - } - - def "Nested non-blocking"() { - when: - def result = runWithSpan({ - Mono.defer({ -> - Span.current().setAttribute("middle", "foo") - return Mono.fromCallable({ -> - Span.current().setAttribute("inner", "bar") - return 1 - }) - .transform({ i -> traceNonBlocking(i, "inner") }) - }) - .transform({ m -> traceNonBlocking(m, "middle") }) - }) - - then: - result == 1 - and: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "trace-parent" - hasNoParent() - attributes { - } - } - - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - - span(2) { - name "middle" - childOf span(1) - attributes { - "middle" "foo" - } - } - - span(3) { - name "inner" - childOf span(2) - attributes { - "inner" "bar" - } - } - } - } - } - - - def "No tracing before registration"() { - when: - tracingOperator.resetOnEachOperator() - - def result1 = Mono.fromCallable({ -> - assert !Span.current().getSpanContext().isValid(): "current span is not set" - return 1 - }) - .transform({ i -> - - def beforeSpan = GlobalOpenTelemetry.getTracer("test").spanBuilder("before").startSpan() - - return ContextPropagationOperator - .runWithContext(i, Context.root().with(beforeSpan)) - .doOnEach({ signal -> - assert !Span.current().getSpanContext().isValid(): "current span is not set" - }) - }).block() - - tracingOperator.registerOnEachOperator() - def result2 = Mono.fromCallable({ -> - assert Span.current().getSpanContext().isValid(): "current span is set" - return 2 - }) - .transform({ i -> - - def afterSpan = GlobalOpenTelemetry.getTracer("test").spanBuilder("after").startSpan() - - return ContextPropagationOperator - .runWithContext(i, Context.root().with(afterSpan)) - .doOnEach({ signal -> - assert Span.current().getSpanContext().isValid(): "current span is set" - if (signal.isOnComplete()) { - Span.current().end() - } - }) - }).block() - - then: - result1 == 1 - result2 == 2 - and: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "after" - hasNoParent() - attributes { - } - } - } - } - } - - def traceNonBlocking(def publisher, def spanName) { - return getDummy(publisher) - .flatMap({ i -> publisher }) - .doOnEach({ signal -> - if (signal.isOnError()) { - // reactor 3.1 does not support getting context here yet - Span.current().setStatus(StatusCode.ERROR) - Span.current().end() - } else if (signal.isOnComplete()) { - Span.current().end() - } - }) - .subscriberContext({ ctx -> - - def parent = ContextPropagationOperator.getOpenTelemetryContext(ctx, Context.current()) - - def innerSpan = GlobalOpenTelemetry.getTracer("test") - .spanBuilder(spanName) - .setParent(parent) - .startSpan() - - return ContextPropagationOperator.storeOpenTelemetryContext(ctx, parent.with(innerSpan)) - }) - } - - def getDummy(def publisher) { - if (publisher instanceof Mono) { - return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE - } else if (publisher instanceof Flux) { - return ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE - } - - throw new IllegalStateException("Unknown publisher") - } -} diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy b/instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy deleted file mode 100644 index a2653bdd9b..0000000000 --- a/instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.reactor - -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import spock.lang.Shared - -class SubscriptionTest extends AbstractSubscriptionTest implements LibraryTestTrait { - @Shared - ContextPropagationOperator tracingOperator = ContextPropagationOperator.create() - - def setupSpec() { - tracingOperator.registerOnEachOperator() - } - - def cleanupSpec() { - tracingOperator.resetOnEachOperator() - } -} diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java new file mode 100644 index 0000000000..cf5da07790 --- /dev/null +++ b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java @@ -0,0 +1,225 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class ReactorCoreTest extends AbstractReactorCoreTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private final ContextPropagationOperator tracingOperator = ContextPropagationOperator.create(); + private final Tracer tracer = testing.getOpenTelemetry().getTracer("test"); + + ReactorCoreTest() { + super(testing); + } + + @BeforeAll + void setUp() { + tracingOperator.registerOnEachOperator(); + } + + @AfterAll + void tearDown() { + tracingOperator.resetOnEachOperator(); + } + + @Test + void monoInNonBlockingPublisherAssembly() { + testing.runWithSpan( + "parent", + () -> + monoSpan( + Mono.fromCallable( + () -> { + Span.current().setAttribute("inner", "foo"); + return 1; + }), + "inner") + .block()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("inner") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("inner", "foo")))); + } + + @Test + void fluxInNonBlockingPublisherAssembly() { + testing.runWithSpan( + "parent", + () -> + ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE + .flatMap( + unused -> + Flux.defer( + () -> { + Span.current().setAttribute("inner", "foo"); + return Flux.just(5, 6); + })) + .doOnEach( + signal -> { + if (signal.isOnError()) { + // reactor 3.1 does not support getting context here yet + Span.current().setStatus(StatusCode.ERROR); + Span.current().end(); + } else if (signal.isOnComplete()) { + Span.current().end(); + } + }) + .subscriberContext( + ctx -> { + Context parent = + ContextPropagationOperator.getOpenTelemetryContext( + ctx, Context.current()); + + Span innerSpan = tracer.spanBuilder("inner").setParent(parent).startSpan(); + return ContextPropagationOperator.storeOpenTelemetryContext( + ctx, parent.with(innerSpan)); + }) + .collectList() + .block()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("inner") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("inner", "foo")))); + } + + @Test + void nestedNonBlocking() { + int result = + testing.runWithSpan( + "parent", + () -> + Mono.defer( + () -> { + Span.current().setAttribute("middle", "foo"); + return Mono.fromCallable( + () -> { + Span.current().setAttribute("inner", "bar"); + return 1; + }) + .transform(publisher -> monoSpan(publisher, "inner")); + }) + .transform(publisher -> monoSpan(publisher, "middle")) + .block()); + + assertThat(result).isEqualTo(1); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("middle") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("middle", "foo")), + span -> + span.hasName("inner") + .hasParent(trace.getSpan(1)) + .hasAttributes(attributeEntry("inner", "bar")))); + } + + @Test + void noTracingBeforeRegistration() { + tracingOperator.resetOnEachOperator(); + + Integer result1 = + Mono.fromCallable( + () -> { + assertThat(Span.current().getSpanContext().isValid()).isFalse(); + return 1; + }) + .transform( + mono -> { + // NB: Because context propagation is disabled, this span is effectively leaked as + // we cannot access it again to + // end after processing. + Span span = tracer.spanBuilder("before").startSpan(); + return ContextPropagationOperator.runWithContext(mono, Context.root().with(span)) + .doOnEach( + unused -> + assertThat(Span.current().getSpanContext().isValid()).isFalse()); + }) + .block(); + + tracingOperator.registerOnEachOperator(); + Integer result2 = + Mono.fromCallable( + () -> { + assertThat(Span.current().getSpanContext().isValid()).isTrue(); + return 2; + }) + .transform( + mono -> { + Span span = tracer.spanBuilder("after").startSpan(); + return ContextPropagationOperator.runWithContext(mono, Context.root().with(span)) + .doOnEach( + signal -> { + assertThat(Span.current().getSpanContext().isValid()).isTrue(); + if (signal.isOnComplete()) { + Span.current().end(); + } + }); + }) + .block(); + + assertThat(result1).isEqualTo(1); + assertThat(result2).isEqualTo(2); + + testing.waitAndAssertTraces( + trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("after").hasNoParent())); + } + + private Mono monoSpan(Mono mono, String spanName) { + return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE + .flatMap(unused -> mono) + .doOnEach( + signal -> { + if (signal.isOnError()) { + // reactor 3.1 does not support getting context here yet + Span.current().setStatus(StatusCode.ERROR); + Span.current().end(); + } else if (signal.isOnComplete()) { + Span.current().end(); + } + }) + .subscriberContext( + ctx -> { + Context parent = + ContextPropagationOperator.getOpenTelemetryContext(ctx, Context.current()); + + Span innerSpan = tracer.spanBuilder(spanName).setParent(parent).startSpan(); + return ContextPropagationOperator.storeOpenTelemetryContext( + ctx, parent.with(innerSpan)); + }); + } +} diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java new file mode 100644 index 0000000000..d7061782fa --- /dev/null +++ b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SubscriptionTest extends AbstractSubscriptionTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private final ContextPropagationOperator tracingOperator = ContextPropagationOperator.create(); + + SubscriptionTest() { + super(testing); + } + + @BeforeAll + void setUp() { + tracingOperator.registerOnEachOperator(); + } + + @AfterAll + void tearDown() { + tracingOperator.resetOnEachOperator(); + } +} diff --git a/instrumentation/reactor/reactor-3.1/testing/build.gradle.kts b/instrumentation/reactor/reactor-3.1/testing/build.gradle.kts index 344f9f8425..ab943128d2 100644 --- a/instrumentation/reactor/reactor-3.1/testing/build.gradle.kts +++ b/instrumentation/reactor/reactor-3.1/testing/build.gradle.kts @@ -7,7 +7,5 @@ dependencies { api("io.projectreactor:reactor-core:3.1.0.RELEASE") - implementation("org.apache.groovy:groovy") implementation("io.opentelemetry:opentelemetry-api") - implementation("org.spockframework:spock-core") } diff --git a/instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.groovy b/instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.groovy deleted file mode 100644 index b4fb313d31..0000000000 --- a/instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.groovy +++ /dev/null @@ -1,521 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.reactor - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.context.Context -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import spock.lang.Shared -import spock.lang.Unroll - -import java.time.Duration - -import static io.opentelemetry.api.trace.StatusCode.ERROR - -@Unroll -abstract class AbstractReactorCoreTest extends InstrumentationSpecification { - - public static final String EXCEPTION_MESSAGE = "test exception" - - @Shared - def addOne = { i -> - addOneFunc(i) - } - - @Shared - def addTwo = { i -> - addTwoFunc(i) - } - - @Shared - def throwException = { - throw new IllegalStateException(EXCEPTION_MESSAGE) - } - - def "Publisher '#paramName' test"() { - when: - def result = runWithSpan(publisherSupplier) - - then: - result == expected - and: - assertTraces(1) { - trace(0, workSpans + 2) { - span(0) { - name "trace-parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - - for (int i = 0; i < workSpans; i++) { - span(2 + i) { - name "add one" - kind SpanKind.INTERNAL - childOf span(1) - } - } - } - } - - where: - paramName | expected | workSpans | publisherSupplier - "basic mono" | 2 | 1 | { -> Mono.just(1).map(addOne) } - "two operations mono" | 4 | 2 | { -> Mono.just(2).map(addOne).map(addOne) } - "delayed mono" | 4 | 1 | { -> - Mono.just(3).delayElement(Duration.ofMillis(100)).map(addOne) - } - "delayed twice mono" | 6 | 2 | { -> - Mono.just(4).delayElement(Duration.ofMillis(100)).map(addOne).delayElement(Duration.ofMillis(100)).map(addOne) - } - "basic flux" | [6, 7] | 2 | { -> Flux.fromIterable([5, 6]).map(addOne) } - "two operations flux" | [8, 9] | 4 | { -> - Flux.fromIterable([6, 7]).map(addOne).map(addOne) - } - "delayed flux" | [8, 9] | 2 | { -> - Flux.fromIterable([7, 8]).delayElements(Duration.ofMillis(100)).map(addOne) - } - "delayed twice flux" | [10, 11] | 4 | { -> - Flux.fromIterable([8, 9]).delayElements(Duration.ofMillis(100)).map(addOne).delayElements(Duration.ofMillis(100)).map(addOne) - } - - "mono from callable" | 12 | 2 | { -> - Mono.fromCallable({ addOneFunc(10) }).map(addOne) - } - } - - def "Publisher error '#paramName' test"() { - when: - runWithSpan(publisherSupplier) - - then: - def exception = thrown RuntimeException - exception.message == EXCEPTION_MESSAGE - and: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "trace-parent" - status ERROR - errorEvent(RuntimeException, EXCEPTION_MESSAGE) - hasNoParent() - } - - // It's important that we don't attach errors at the Reactor level so that we don't - // impact the spans on reactor instrumentations such as netty and lettuce, as reactor is - // more of a context propagation mechanism than something we would be tracking for - // errors this is ok. - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - } - } - - where: - paramName | publisherSupplier - "mono" | { -> Mono.error(new RuntimeException(EXCEPTION_MESSAGE)) } - "flux" | { -> Flux.error(new RuntimeException(EXCEPTION_MESSAGE)) } - } - - def "Publisher step '#paramName' test"() { - when: - runWithSpan(publisherSupplier) - - then: - def exception = thrown IllegalStateException - exception.message == EXCEPTION_MESSAGE - and: - assertTraces(1) { - trace(0, workSpans + 2) { - span(0) { - name "trace-parent" - status ERROR - errorEvent(IllegalStateException, EXCEPTION_MESSAGE) - hasNoParent() - } - - // It's important that we don't attach errors at the Reactor level so that we don't - // impact the spans on reactor instrumentations such as netty and lettuce, as reactor is - // more of a context propagation mechanism than something we would be tracking for - // errors this is ok. - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - - for (int i = 0; i < workSpans; i++) { - span(i + 2) { - name "add one" - childOf span(1) - attributes { - } - } - } - } - } - - where: - paramName | workSpans | publisherSupplier - "basic mono failure" | 1 | { -> Mono.just(1).map(addOne).map({ throwException() }) } - "basic flux failure" | 1 | { -> - Flux.fromIterable([5, 6]).map(addOne).map({ throwException() }) - } - } - - def "Publisher '#paramName' cancel"() { - when: - cancelUnderTrace(publisherSupplier) - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "trace-parent" - hasNoParent() - attributes { - } - } - - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - } - } - - where: - paramName | publisherSupplier - "basic mono" | { -> Mono.just(1) } - "basic flux" | { -> Flux.fromIterable([5, 6]) } - } - - def "Publisher chain spans have the correct parent for '#paramName'"() { - when: - runWithSpan(publisherSupplier) - - then: - assertTraces(1) { - trace(0, workSpans + 2) { - span(0) { - name "trace-parent" - hasNoParent() - attributes { - } - } - - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - - for (int i = 0; i < workSpans; i++) { - span(i + 2) { - name "add one" - childOf span(1) - attributes { - } - } - } - } - } - - where: - paramName | workSpans | publisherSupplier - "basic mono" | 3 | { -> - Mono.just(1).map(addOne).map(addOne).then(Mono.just(1).map(addOne)) - } - "basic flux" | 5 | { -> - Flux.fromIterable([5, 6]).map(addOne).map(addOne).then(Mono.just(1).map(addOne)) - } - } - - def "Publisher chain spans have the correct parents from assembly time '#paramName'"() { - when: - runWithSpan { - // The "add one" operations in the publisher created here should be children of the publisher-parent - Publisher publisher = publisherSupplier() - - def tracer = GlobalOpenTelemetry.getTracer("test") - def intermediate = tracer.spanBuilder("intermediate").startSpan() - // After this activation, the "add two" operations below should be children of this span - def scope = Context.current().with(intermediate).makeCurrent() - try { - if (publisher instanceof Mono) { - return ((Mono) publisher).map(addTwo) - } else if (publisher instanceof Flux) { - return ((Flux) publisher).map(addTwo) - } - throw new IllegalStateException("Unknown publisher type") - } finally { - intermediate.end() - scope.close() - } - } - - then: - assertTraces(1) { - trace(0, (workItems * 2) + 3) { - span(0) { - name "trace-parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "publisher-parent" - kind SpanKind.INTERNAL - childOf span(0) - } - span(2) { - name "intermediate" - kind SpanKind.INTERNAL - childOf span(1) - } - - for (int i = 0; i < 2 * workItems; i = i + 2) { - span(3 + i) { - name "add one" - kind SpanKind.INTERNAL - childOf span(1) - } - span(3 + i + 1) { - name "add two" - kind SpanKind.INTERNAL - childOf span(1) - } - } - } - } - - where: - paramName | workItems | publisherSupplier - "basic mono" | 1 | { -> Mono.just(1).map(addOne) } - "basic flux" | 2 | { -> Flux.fromIterable([1, 2]).map(addOne) } - } - - def "Nested delayed mono with high concurrency"() { - setup: - def iterations = 100 - def remainingIterations = new HashSet<>((0L.. - def outer = Mono.just("") - .map({ it }) - .delayElement(Duration.ofMillis(10)) - .map({ it }) - .delayElement(Duration.ofMillis(10)) - .doOnSuccess({ - def middle = Mono.just("") - .map({ it }) - .delayElement(Duration.ofMillis(10)) - .doOnSuccess({ - runWithSpan("inner") { - Span.current().setAttribute("iteration", iteration) - } - }) - - runWithSpan("middle") { - Span.current().setAttribute("iteration", iteration) - middle.subscribe() - } - }) - - // Context must propagate even if only subscribe is in root span scope - runWithSpan("outer") { - Span.current().setAttribute("iteration", iteration) - outer.subscribe() - } - } - - then: - assertTraces(iterations) { - for (int i = 0; i < iterations; i++) { - trace(i, 3) { - long iteration = -1 - span(0) { - name("outer") - iteration = span.getAttributes().get(AttributeKey.longKey("iteration")).toLong() - assert remainingIterations.remove(iteration) - } - span(1) { - name("middle") - childOf(span(0)) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - span(2) { - name("inner") - childOf(span(1)) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - } - } - } - - assert remainingIterations.isEmpty() - } - - def "Nested delayed flux with high concurrency"() { - setup: - def iterations = 100 - def remainingIterations = new HashSet<>((0L.. - def outer = Flux.just("a", "b") - .map({ it }) - .delayElements(Duration.ofMillis(10)) - .map({ it }) - .delayElements(Duration.ofMillis(10)) - .doOnEach({ middleSignal -> - if (middleSignal.hasValue()) { - def value = middleSignal.get() - - def middle = Flux.just("c", "d") - .map({ it }) - .delayElements(Duration.ofMillis(10)) - .doOnEach({ innerSignal -> - if (innerSignal.hasValue()) { - runWithSpan("inner " + value + innerSignal.get()) { - Span.current().setAttribute("iteration", iteration) - } - } - }) - - runWithSpan("middle " + value) { - Span.current().setAttribute("iteration", iteration) - middle.subscribe() - } - } - }) - - // Context must propagate even if only subscribe is in root span scope - runWithSpan("outer") { - Span.current().setAttribute("iteration", iteration) - outer.subscribe() - } - } - - then: - assertTraces(iterations) { - for (int i = 0; i < iterations; i++) { - trace(i, 7) { - long iteration = -1 - String middleA = null - String middleB = null - span(0) { - name("outer") - iteration = span.getAttributes().get(AttributeKey.longKey("iteration")).toLong() - assert remainingIterations.remove(iteration) - } - span("middle a") { - middleA = span.spanId - childOf(span(0)) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - span("middle b") { - middleB = span.spanId - childOf(span(0)) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - span("inner ac") { - parentSpanId(middleA) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - span("inner ad") { - parentSpanId(middleA) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - span("inner bc") { - parentSpanId(middleB) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - span("inner bd") { - parentSpanId(middleB) - assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration - } - } - } - } - - assert remainingIterations.isEmpty() - } - - def runWithSpan(def publisherSupplier) { - runWithSpan("trace-parent") { - def tracer = GlobalOpenTelemetry.getTracer("test") - def span = tracer.spanBuilder("publisher-parent").startSpan() - def scope = Context.current().with(span).makeCurrent() - try { - def publisher = publisherSupplier() - // Read all data from publisher - if (publisher instanceof Mono) { - return publisher.block() - } else if (publisher instanceof Flux) { - return publisher.toStream().toArray({ size -> new Integer[size] }) - } - - throw new IllegalStateException("Unknown publisher: " + publisher) - } finally { - span.end() - scope.close() - } - } - } - - def cancelUnderTrace(def publisherSupplier) { - runWithSpan("trace-parent") { - def tracer = GlobalOpenTelemetry.getTracer("test") - def span = tracer.spanBuilder("publisher-parent").startSpan() - def scope = Context.current().with(span).makeCurrent() - - def publisher = publisherSupplier() - publisher.subscribe(new Subscriber() { - void onSubscribe(Subscription subscription) { - subscription.cancel() - } - - void onNext(Integer t) { - } - - void onError(Throwable error) { - } - - void onComplete() { - } - }) - - span.end() - scope.close() - } - } - - int addOneFunc(int i) { - runWithSpan("add one") {} - return i + 1 - } - - int addTwoFunc(int i) { - runWithSpan("add two") {} - return i + 2 - } -} diff --git a/instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.groovy b/instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.groovy deleted file mode 100644 index 53fdcac945..0000000000 --- a/instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.groovy +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.reactor - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import reactor.core.publisher.Mono - -import java.time.Duration -import java.util.concurrent.CountDownLatch - -abstract class AbstractSubscriptionTest extends InstrumentationSpecification { - - def "subscription test"() { - when: - Mono connection = Mono.create { - it.success(new Connection()) - } - CountDownLatch latch = new CountDownLatch(1) - runWithSpan("parent") { - connection - .delayElement(Duration.ofMillis(1)) - .subscribe { - it.query() - latch.countDown() - } - } - latch.await() - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "Connection.query" - kind SpanKind.INTERNAL - childOf span(0) - } - } - } - } - - static class Connection { - static int query() { - def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan() - span.end() - return new Random().nextInt() - } - } -} diff --git a/instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.java b/instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.java new file mode 100644 index 0000000000..e1c64f4671 --- /dev/null +++ b/instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.java @@ -0,0 +1,578 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractReactorCoreTest { + + private final InstrumentationExtension testing; + + protected AbstractReactorCoreTest(InstrumentationExtension testing) { + this.testing = testing; + } + + @Test + void basicMono() { + int result = testing.runWithSpan("parent", () -> Mono.just(1).map(this::addOne).block()); + assertThat(result).isEqualTo(2); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void twoOperationsMono() { + int result = + testing.runWithSpan( + "parent", () -> Mono.just(2).map(this::addOne).map(this::addOne).block()); + assertThat(result).isEqualTo(4); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void delayedMono() { + int result = + testing.runWithSpan( + "parent", + () -> Mono.just(3).delayElement(Duration.ofMillis(1)).map(this::addOne).block()); + assertThat(result).isEqualTo(4); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void delayedTwiceMono() { + int result = + testing.runWithSpan( + "parent", + () -> + Mono.just(4) + .delayElement(Duration.ofMillis(1)) + .map(this::addOne) + .delayElement(Duration.ofMillis(1)) + .map(this::addOne) + .block()); + assertThat(result).isEqualTo(6); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void basicFlux() { + List result = + testing.runWithSpan( + "parent", + () -> Flux.fromStream(Stream.of(5, 6)).map(this::addOne).collectList().block()); + assertThat(result).containsExactly(6, 7); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void twoOperationsFlux() { + List result = + testing.runWithSpan( + "parent", + () -> + Flux.fromStream(Stream.of(6, 7)) + .map(this::addOne) + .map(this::addOne) + .collectList() + .block()); + assertThat(result).containsExactly(8, 9); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void delayedFlux() { + List result = + testing.runWithSpan( + "parent", + () -> + Flux.fromStream(Stream.of(7, 8)) + .delayElements(Duration.ofMillis(1)) + .map(this::addOne) + .collectList() + .block()); + assertThat(result).containsExactly(8, 9); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void delayedTwiceFlux() { + List result = + testing.runWithSpan( + "parent", + () -> + Flux.fromStream(Stream.of(8, 9)) + .delayElements(Duration.ofMillis(1)) + .map(this::addOne) + .delayElements(Duration.ofMillis(1)) + .map(this::addOne) + .collectList() + .block()); + assertThat(result).containsExactly(10, 11); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void monoFromCallable() { + int result = + testing.runWithSpan( + "parent", () -> Mono.fromCallable(() -> addOne(10)).map(this::addOne).block()); + assertThat(result).isEqualTo(12); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void monoError() { + IllegalStateException error = new IllegalStateException("exception"); + assertThatThrownBy(() -> testing.runWithSpan("parent", () -> Mono.error(error).block())) + .isEqualTo(error); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(error))); + } + + @Test + void fluxError() { + IllegalStateException error = new IllegalStateException("exception"); + assertThatThrownBy( + () -> testing.runWithSpan("parent", () -> Flux.error(error).collectList().block())) + .isEqualTo(error); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(error))); + } + + @Test + void monoStepError() { + IllegalStateException error = new IllegalStateException("exception"); + assertThatThrownBy( + () -> + testing.runWithSpan( + "parent", + () -> + Mono.just(1) + .map(this::addOne) + .map( + unused -> { + throw error; + }) + .block())) + .isEqualTo(error); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(error), + span -> + span.hasName("add one") + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.unset()))); + } + + @Test + void fluxStepError() { + IllegalStateException error = new IllegalStateException("exception"); + assertThatThrownBy( + () -> + testing.runWithSpan( + "parent", + () -> + Flux.just(5, 6) + .map(this::addOne) + .map( + unused -> { + throw error; + }) + .collectList() + .block())) + .isEqualTo(error); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(error), + span -> + span.hasName("add one") + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.unset()))); + } + + @Test + void cancelMono() { + testing.runWithSpan("parent", () -> Mono.just(1).subscribe(CancellingSubscriber.INSTANCE)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasStatus(StatusData.unset()))); + } + + @Test + void cancelFlux() { + testing.runWithSpan("parent", () -> Flux.just(3, 4).subscribe(CancellingSubscriber.INSTANCE)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasStatus(StatusData.unset()))); + } + + @Test + void monoChain() { + int result = + testing.runWithSpan( + "parent", + () -> + Mono.just(1) + .map(this::addOne) + .map(this::addOne) + .then(Mono.just(1).map(this::addOne)) + .block()); + assertThat(result).isEqualTo(2); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void fluxChain() { + int result = + testing.runWithSpan( + "parent", + () -> + Flux.just(5, 6) + .map(this::addOne) + .map(this::addOne) + .then(Mono.just(1).map(this::addOne)) + .block()); + assertThat(result).isEqualTo(2); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)))); + } + + @Test + void monoChainHasAssemblyContext() { + int result = + testing.runWithSpan( + "parent", + () -> { + Mono mono = Mono.just(1).map(this::addOne); + return testing.runWithSpan("intermediate", () -> mono.map(this::addTwo)).block(); + }); + assertThat(result).isEqualTo(4); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("intermediate").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add two").hasParent(trace.getSpan(0)))); + } + + @Test + void fluxChainHasAssemblyContext() { + List result = + testing.runWithSpan( + "parent", + () -> { + Flux flux = Flux.just(1, 2).map(this::addOne); + return testing + .runWithSpan("intermediate", () -> flux.map(this::addTwo)) + .collectList() + .block(); + }); + assertThat(result).containsExactly(4, 5); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("intermediate").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add two").hasParent(trace.getSpan(0)), + span -> span.hasName("add one").hasParent(trace.getSpan(0)), + span -> span.hasName("add two").hasParent(trace.getSpan(0)))); + } + + @Test + void nestedDelayedMonoHighConcurrency() { + for (int i = 0; i < 100; i++) { + int iteration = i; + Mono outer = + Mono.just("") + .map(Function.identity()) + .delayElement(Duration.ofMillis(1)) + .map(Function.identity()) + .delayElement(Duration.ofMillis(1)) + .doOnSuccess( + unused -> { + Mono middle = + Mono.just("") + .map(Function.identity()) + .doOnSuccess( + unused2 -> + testing.runWithSpan( + "inner", + () -> Span.current().setAttribute("iteration", iteration))); + + testing.runWithSpan( + "middle", + () -> { + Span.current().setAttribute("iteration", iteration); + middle.subscribe(); + }); + }); + + // Context must propagate even if only subscribe is in root span scope + testing.runWithSpan( + "outer", + () -> { + Span.current().setAttribute("iteration", iteration); + outer.subscribe(); + }); + } + + List> assertions = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + int iteration = i; + assertions.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("outer") + .hasNoParent() + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("middle") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("inner") + .hasParent(trace.getSpan(1)) + .hasAttributes(attributeEntry("iteration", iteration)))); + } + testing.waitAndAssertTraces(assertions); + } + + @Test + void nestedDelayedFluxHighConcurrency() { + for (int i = 0; i < 100; i++) { + int iteration = i; + Flux outer = + Flux.just("a", "b") + .map(Function.identity()) + .delayElements(Duration.ofMillis(1)) + .map(Function.identity()) + .delayElements(Duration.ofMillis(1)) + .doOnEach( + middleSignal -> { + if (middleSignal.hasValue()) { + String value = middleSignal.get(); + Flux middle = + Flux.just("c", "d") + .map(Function.identity()) + .delayElements(Duration.ofMillis(1)) + .doOnEach( + innerSignal -> { + if (innerSignal.hasValue()) { + testing.runWithSpan( + "inner " + value + innerSignal.get(), + () -> + Span.current().setAttribute("iteration", iteration)); + } + }); + + testing.runWithSpan( + "middle " + value, + () -> { + Span.current().setAttribute("iteration", iteration); + middle.subscribe(); + }); + } + }); + + // Context must propagate even if only subscribe is in root span scope + testing.runWithSpan( + "outer", + () -> { + Span.current().setAttribute("iteration", iteration); + outer.subscribe(); + }); + } + + List> assertions = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + int iteration = i; + assertions.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("outer") + .hasNoParent() + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("middle a") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("inner ac") + .hasParent(trace.getSpan(1)) + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("inner ad") + .hasParent(trace.getSpan(1)) + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("middle b") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("inner bc") + .hasParent(trace.getSpan(4)) + .hasAttributes(attributeEntry("iteration", iteration)), + span -> + span.hasName("inner bd") + .hasParent(trace.getSpan(4)) + .hasAttributes(attributeEntry("iteration", iteration)))); + } + testing.waitAndAssertTraces(assertions); + } + + private int addOne(int i) { + return testing.runWithSpan("add one", () -> i + 1); + } + + private int addTwo(int i) { + return testing.runWithSpan("add two", () -> i + 2); + } + + private enum CancellingSubscriber implements Subscriber { + INSTANCE; + + @Override + public void onSubscribe(Subscription subscription) { + subscription.cancel(); + } + + @Override + public void onNext(Integer integer) {} + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onComplete() {} + } +} diff --git a/instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.java b/instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.java new file mode 100644 index 0000000000..c321fe172c --- /dev/null +++ b/instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import reactor.core.publisher.Mono; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractSubscriptionTest { + + private final InstrumentationExtension testing; + + protected AbstractSubscriptionTest(InstrumentationExtension testing) { + this.testing = testing; + } + + @Test + void subscription() { + Mono connection = Mono.create(sink -> sink.success(new Connection())); + testing.runWithSpan( + "parent", () -> connection.delayElement(Duration.ofMillis(1)).subscribe(Connection::query)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("Connection.query").hasParent(trace.getSpan(0)))); + } + + private class Connection { + void query() { + testing.runWithSpan("Connection.query", () -> {}); + } + } +}