Reduce reactor stack trace depth (#9923)
This commit is contained in:
parent
c0ff484bff
commit
952ecd0a34
|
@ -22,6 +22,7 @@ package io.opentelemetry.instrumentation.reactor.v3_1;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import io.opentelemetry.context.Scope;
|
import io.opentelemetry.context.Scope;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import org.reactivestreams.Subscriber;
|
import org.reactivestreams.Subscriber;
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
import reactor.core.CoreSubscriber;
|
import reactor.core.CoreSubscriber;
|
||||||
|
@ -56,30 +57,40 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSubscribe(Subscription subscription) {
|
public void onSubscribe(Subscription subscription) {
|
||||||
withActiveSpan(() -> subscriber.onSubscribe(subscription));
|
try (Scope ignore = openScope()) {
|
||||||
|
subscriber.onSubscribe(subscription);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNext(T o) {
|
public void onNext(T o) {
|
||||||
withActiveSpan(() -> subscriber.onNext(o));
|
try (Scope ignore = openScope()) {
|
||||||
|
subscriber.onNext(o);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable throwable) {
|
public void onError(Throwable throwable) {
|
||||||
|
Supplier<Scope> scopeSupplier;
|
||||||
if (!hasContextToPropagate
|
if (!hasContextToPropagate
|
||||||
&& (fluxRetrySubscriberClass == subscriber.getClass()
|
&& (fluxRetrySubscriberClass == subscriber.getClass()
|
||||||
|| fluxRetryWhenSubscriberClass == subscriber.getClass())) {
|
|| fluxRetryWhenSubscriberClass == subscriber.getClass())) {
|
||||||
// clear context for retry to avoid having retried operations run with currently active
|
// clear context for retry to avoid having retried operations run with currently active
|
||||||
// context as parent context
|
// context as parent context
|
||||||
withActiveSpan(io.opentelemetry.context.Context.root(), () -> subscriber.onError(throwable));
|
scopeSupplier = () -> openScope(io.opentelemetry.context.Context.root());
|
||||||
} else {
|
} else {
|
||||||
withActiveSpan(() -> subscriber.onError(throwable));
|
scopeSupplier = () -> openScope();
|
||||||
|
}
|
||||||
|
try (Scope ignore = scopeSupplier.get()) {
|
||||||
|
subscriber.onError(throwable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onComplete() {
|
public void onComplete() {
|
||||||
withActiveSpan(subscriber::onComplete);
|
try (Scope ignore = openScope()) {
|
||||||
|
subscriber.onComplete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -87,18 +98,12 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void withActiveSpan(Runnable runnable) {
|
private Scope openScope() {
|
||||||
withActiveSpan(hasContextToPropagate ? traceContext : null, runnable);
|
return openScope(hasContextToPropagate ? traceContext : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void withActiveSpan(io.opentelemetry.context.Context context, Runnable runnable) {
|
private static Scope openScope(io.opentelemetry.context.Context context) {
|
||||||
if (context != null) {
|
return context != null ? context.makeCurrent() : null;
|
||||||
try (Scope ignored = context.makeCurrent()) {
|
|
||||||
runnable.run();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
runnable.run();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Class<?> getFluxRetrySubscriberClass() {
|
private static Class<?> getFluxRetrySubscriberClass() {
|
||||||
|
|
Loading…
Reference in New Issue