Fix rxjava2 NoSuchFieldError (#2836)

This commit is contained in:
Lauri Tulmin 2021-04-22 00:00:39 +03:00 committed by GitHub
parent f956a58c0a
commit 3cb210a673
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 13 deletions

View File

@ -178,6 +178,7 @@ public final class TracingAssembly {
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
private static void enableObservable() { private static void enableObservable() {
if (TracingObserver.canEnable()) {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe( RxJavaPlugins.setOnObservableSubscribe(
biCompose( biCompose(
@ -189,6 +190,7 @@ public final class TracingAssembly {
} }
})); }));
} }
}
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
private static void enableSingle() { private static void enableSingle() {

View File

@ -27,8 +27,11 @@ import io.opentelemetry.context.Scope;
import io.reactivex.Observer; import io.reactivex.Observer;
import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.observers.BasicFuseableObserver; import io.reactivex.internal.observers.BasicFuseableObserver;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
class TracingObserver<T> extends BasicFuseableObserver<T, T> { class TracingObserver<T> extends BasicFuseableObserver<T, T> {
private static final MethodHandle queueDisposableGetter = getQueueDisposableGetter();
// BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it // BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it
// in this class // in this class
@ -64,7 +67,7 @@ class TracingObserver<T> extends BasicFuseableObserver<T, T> {
@Override @Override
public int requestFusion(int mode) { public int requestFusion(int mode) {
final QueueDisposable<T> qd = this.qs; final QueueDisposable<T> qd = getQueueDisposable();
if (qd != null) { if (qd != null) {
final int m = qd.requestFusion(mode); final int m = qd.requestFusion(mode);
sourceMode = m; sourceMode = m;
@ -75,6 +78,36 @@ class TracingObserver<T> extends BasicFuseableObserver<T, T> {
@Override @Override
public T poll() throws Exception { public T poll() throws Exception {
return qs.poll(); return getQueueDisposable().poll();
}
private QueueDisposable<T> getQueueDisposable() {
try {
return (QueueDisposable<T>) queueDisposableGetter.invoke(this);
} catch (Throwable throwable) {
throw new IllegalStateException(throwable);
}
}
private static MethodHandle getGetterHandle(String fieldName) {
try {
return MethodHandles.lookup()
.findGetter(BasicFuseableObserver.class, fieldName, QueueDisposable.class);
} catch (NoSuchFieldException | IllegalAccessException ignored) {
}
return null;
}
private static MethodHandle getQueueDisposableGetter() {
MethodHandle getter = getGetterHandle("qd");
if (getter == null) {
// in versions before 2.2.1 field was named "qs"
getter = getGetterHandle("qs");
}
return getter;
}
public static boolean canEnable() {
return queueDisposableGetter != null;
} }
} }

View File

@ -6,10 +6,12 @@
package io.opentelemetry.instrumentation.rxjava2 package io.opentelemetry.instrumentation.rxjava2
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.Observable
import io.reactivex.Single import io.reactivex.Single
import io.reactivex.functions.Consumer import io.reactivex.functions.Consumer
@ -17,7 +19,7 @@ import java.util.concurrent.CountDownLatch
abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification { abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification {
def "subscription test"() { def "subscribe single test"() {
when: when:
CountDownLatch latch = new CountDownLatch(1) CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") { runUnderTrace("parent") {
@ -43,6 +45,32 @@ abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecificat
} }
} }
def "test observable fusion"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4)
integerObservable.concatMap({
return Observable.just(it)
}).count().subscribe(new Consumer<Long>() {
@Override
void accept(Long count) {
runInternalSpan("child")
latch.countDown()
}
})
}
latch.await()
then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "child", span(0))
}
}
}
static class Connection { static class Connection {
static int query() { static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan() def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()