Filter out scalar Mono/Flux instances (#8571)

This commit is contained in:
Mateusz Rzeszutek 2023-05-26 11:13:11 +02:00 committed by GitHub
parent ba4eea2d82
commit 262d77164f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 6 deletions

View File

@ -155,7 +155,8 @@ public final class ContextPropagationOperator {
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
return Operators.lift(new Lifter<>(asyncOperationEndStrategy));
return Operators.lift(
ContextPropagationOperator::shouldInstrument, new Lifter<>(asyncOperationEndStrategy));
}
/** Forces Mono to run in traceContext scope. */
@ -220,7 +221,12 @@ public final class ContextPropagationOperator {
}
}
public static class Lifter<T>
private static boolean shouldInstrument(Scannable publisher) {
// skip if Flux/Mono #just, #empty, #error
return !(publisher instanceof Fuseable.ScalarCallable);
}
private static class Lifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
/** Holds reference to strategy to prevent it from being collected. */
@ -233,10 +239,6 @@ public final class ContextPropagationOperator {
@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
// if Flux/Mono #just, #empty, #error
if (publisher instanceof Fuseable.ScalarCallable) {
return sub;
}
return new TracingSubscriber<>(sub, sub.currentContext());
}
}

View File

@ -7,10 +7,14 @@ package io.opentelemetry.instrumentation.reactor.v3_1;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
class HooksTest {
@ -43,4 +47,28 @@ class HooksTest {
subscriber.set(actual);
}
}
@Test
void testInvalidBlockUsage() throws InterruptedException {
ContextPropagationOperator operator = ContextPropagationOperator.create();
operator.registerOnEachOperator();
Callable<String> callable =
() -> {
Mono.just("test1").block();
return "call1";
};
Disposable disposable =
Mono.defer(
() ->
Mono.fromCallable(callable).publishOn(Schedulers.elastic()).flatMap(Mono::just))
.subscribeOn(Schedulers.single())
.subscribe();
TimeUnit.MILLISECONDS.sleep(100);
disposable.dispose();
operator.resetOnEachOperator();
}
}