Rxjava2 library instrumentation (#2191)

* RxJava 2 library instrumentation

* Spotless fix

* Ignoring problematic test - for now

* Spock test fix

* Gradle testing module

* After upstream main merge

* Spotless fix

* onSubscribe plugin

* Major refactoring

* After main branch merge

* Spotless fix

* Code review fixes

* More code review fixes

* Removing obsolete Connectable* classes

* Apply suggestions from code review

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>

* Code review fixes

* Adding a Javadoc comment that describes test methodology

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
This commit is contained in:
Piotr Glazar 2021-02-17 13:12:50 +01:00 committed by GitHub
parent 836da8dbfd
commit 64c8a51f7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1297 additions and 2 deletions

View File

@ -0,0 +1,7 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"
dependencies {
library group: 'io.reactivex.rxjava2', name: 'rxjava', version: "2.1.3"
testImplementation project(':instrumentation:rxjava-2.0:testing')
}

View File

@ -0,0 +1,266 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.plugins.RxJavaPlugins;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.reactivestreams.Subscriber;
/**
* RxJava2 library instrumentation.
*
* <p>In order to enable RxJava2 instrumentation one has to call the {@link
* TracingAssembly#enable()} method.
*
* <p>Instrumentation uses <code>on*Assembly</code> and <code>on*Subscribe</code> RxJavaPlugin hooks
* to wrap RxJava2 classes in their tracing equivalents.
*
* <p>Instrumentation can be disabled by calling the {@link TracingAssembly#disable()} method.
*/
public final class TracingAssembly {
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Observable, ? super Observer, ? extends Observer>
oldOnObservableSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<
? super Completable, ? super CompletableObserver, ? extends CompletableObserver>
oldOnCompletableSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Single, ? super SingleObserver, ? extends SingleObserver>
oldOnSingleSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Maybe, ? super MaybeObserver, ? extends MaybeObserver>
oldOnMaybeSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber>
oldOnFlowableSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
oldOnParallelAssembly;
@GuardedBy("TracingAssembly.class")
private static boolean enabled;
private TracingAssembly() {}
public static synchronized void enable() {
if (enabled) {
return;
}
enableObservable();
enableCompletable();
enableSingle();
enableMaybe();
enableFlowable();
enableParallel();
enabled = true;
}
public static synchronized void disable() {
if (!enabled) {
return;
}
disableObservable();
disableCompletable();
disableSingle();
disableMaybe();
disableFlowable();
disableParallel();
enabled = false;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableParallel() {
oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly();
RxJavaPlugins.setOnParallelAssembly(
compose(
oldOnParallelAssembly,
parallelFlowable -> new TracingParallelFlowable(parallelFlowable, Context.current())));
}
private static void enableCompletable() {
oldOnCompletableSubscribe = RxJavaPlugins.getOnCompletableSubscribe();
RxJavaPlugins.setOnCompletableSubscribe(
biCompose(
oldOnCompletableSubscribe,
(completable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingCompletableObserver(observer, context);
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableFlowable() {
oldOnFlowableSubscribe = RxJavaPlugins.getOnFlowableSubscribe();
RxJavaPlugins.setOnFlowableSubscribe(
biCompose(
oldOnFlowableSubscribe,
(flowable, subscriber) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
if (subscriber instanceof ConditionalSubscriber) {
return new TracingConditionalSubscriber<>(
(ConditionalSubscriber) subscriber, context);
} else {
return new TracingSubscriber<>(subscriber, context);
}
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableObservable() {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe(
biCompose(
oldOnObservableSubscribe,
(observable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingObserver(observer, context);
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableSingle() {
oldOnSingleSubscribe = RxJavaPlugins.getOnSingleSubscribe();
RxJavaPlugins.setOnSingleSubscribe(
biCompose(
oldOnSingleSubscribe,
(single, singleObserver) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingSingleObserver(singleObserver, context);
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableMaybe() {
oldOnMaybeSubscribe = RxJavaPlugins.getOnMaybeSubscribe();
RxJavaPlugins.setOnMaybeSubscribe(
(BiFunction<? super Maybe, MaybeObserver, ? extends MaybeObserver>)
biCompose(
oldOnMaybeSubscribe,
(BiFunction<Maybe, MaybeObserver, MaybeObserver>)
(maybe, maybeObserver) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingMaybeObserver(maybeObserver, context);
}
}));
}
private static void disableParallel() {
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
oldOnParallelAssembly = null;
}
private static void disableObservable() {
RxJavaPlugins.setOnObservableSubscribe(oldOnObservableSubscribe);
oldOnObservableSubscribe = null;
}
private static void disableCompletable() {
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
oldOnCompletableSubscribe = null;
}
private static void disableFlowable() {
RxJavaPlugins.setOnFlowableSubscribe(oldOnFlowableSubscribe);
oldOnFlowableSubscribe = null;
}
private static void disableSingle() {
RxJavaPlugins.setOnSingleSubscribe(oldOnSingleSubscribe);
oldOnSingleSubscribe = null;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void disableMaybe() {
RxJavaPlugins.setOnMaybeSubscribe(
(BiFunction<? super Maybe, MaybeObserver, ? extends MaybeObserver>) oldOnMaybeSubscribe);
oldOnMaybeSubscribe = null;
}
private static <T> Function<? super T, ? extends T> compose(
Function<? super T, ? extends T> before, Function<? super T, ? extends T> after) {
if (before == null) {
return after;
}
return (T v) -> after.apply(before.apply(v));
}
private static <T, U> BiFunction<? super T, ? super U, ? extends U> biCompose(
BiFunction<? super T, ? super U, ? extends U> before,
BiFunction<? super T, ? super U, ? extends U> after) {
if (before == null) {
return after;
}
return (T v, U u) -> after.apply(v, before.apply(v, u));
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
class TracingCompletableObserver implements CompletableObserver, Disposable {
private final CompletableObserver actual;
private final Context context;
private Disposable disposable;
TracingCompletableObserver(final CompletableObserver actual, final Context context) {
this.actual = actual;
this.context = context;
}
@Override
public void onSubscribe(final Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
actual.onSubscribe(this);
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public void onError(final Throwable e) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(e);
}
}
@Override
public void dispose() {
disposable.dispose();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.fuseable.QueueSubscription;
import io.reactivex.internal.subscribers.BasicFuseableConditionalSubscriber;
class TracingConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
private final Context context;
TracingConditionalSubscriber(
final ConditionalSubscriber<? super T> actual, final Context context) {
super(actual);
this.context = context;
}
@Override
public boolean tryOnNext(T t) {
try (Scope ignored = context.makeCurrent()) {
return actual.tryOnNext(t);
}
}
@Override
public void onNext(T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onNext(t);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(t);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public int requestFusion(int mode) {
final QueueSubscription<T> qs = this.qs;
if (qs != null) {
final int m = qs.requestFusion(mode);
sourceMode = m;
return m;
}
return NONE;
}
@Override
public T poll() throws Exception {
return qs.poll();
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.MaybeObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
class TracingMaybeObserver<T> implements MaybeObserver<T>, Disposable {
private final MaybeObserver<T> actual;
private final Context context;
private Disposable disposable;
TracingMaybeObserver(final MaybeObserver<T> actual, final Context context) {
this.actual = actual;
this.context = context;
}
@Override
public void onSubscribe(final Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
actual.onSubscribe(this);
}
@Override
public void onSuccess(final T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onSuccess(t);
}
}
@Override
public void onError(final Throwable e) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(e);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public void dispose() {
disposable.dispose();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.Observer;
import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.observers.BasicFuseableObserver;
public class TracingObserver<T> extends BasicFuseableObserver<T, T> {
private final Context context;
TracingObserver(final Observer<? super T> actual, final Context context) {
super(actual);
this.context = context;
}
@Override
public void onNext(T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onNext(t);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(t);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public int requestFusion(int mode) {
final QueueDisposable<T> qd = this.qs;
if (qd != null) {
final int m = qd.requestFusion(mode);
sourceMode = m;
return m;
}
return NONE;
}
@Override
public T poll() throws Exception {
return qs.poll();
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.parallel.ParallelFlowable;
import org.reactivestreams.Subscriber;
class TracingParallelFlowable<T> extends ParallelFlowable<T> {
private final ParallelFlowable<T> source;
private final Context context;
TracingParallelFlowable(final ParallelFlowable<T> source, final Context context) {
this.source = source;
this.context = context;
}
@SuppressWarnings("unchecked")
@Override
public void subscribe(final Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}
final int n = subscribers.length;
final Subscriber<? super T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
final Subscriber<? super T> z = subscribers[i];
if (z instanceof ConditionalSubscriber) {
parents[i] =
new TracingConditionalSubscriber<>((ConditionalSubscriber<? super T>) z, context);
} else {
parents[i] = new TracingSubscriber<>(z, context);
}
}
try (Scope ignored = context.makeCurrent()) {
source.subscribe(parents);
}
}
@Override
public int parallelism() {
return source.parallelism();
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
class TracingSingleObserver<T> implements SingleObserver<T>, Disposable {
private final SingleObserver<T> actual;
private final Context context;
private Disposable disposable;
TracingSingleObserver(final SingleObserver<T> actual, final Context context) {
this.actual = actual;
this.context = context;
}
@Override
public void onSubscribe(final Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
this.disposable = d;
actual.onSubscribe(this);
}
@Override
public void onSuccess(final T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onSuccess(t);
}
}
@Override
public void onError(Throwable throwable) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(throwable);
}
}
@Override
public void dispose() {
disposable.dispose();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava2;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.internal.fuseable.QueueSubscription;
import io.reactivex.internal.subscribers.BasicFuseableSubscriber;
import org.reactivestreams.Subscriber;
class TracingSubscriber<T> extends BasicFuseableSubscriber<T, T> {
private final Context context;
TracingSubscriber(final Subscriber<? super T> actual, final Context context) {
super(actual);
this.context = context;
}
@Override
public void onNext(T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onNext(t);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(t);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public int requestFusion(int mode) {
final QueueSubscription<T> qs = this.qs;
if (qs != null) {
final int m = qs.requestFusion(mode);
sourceMode = m;
return m;
}
return NONE;
}
@Override
public T poll() throws Exception {
return qs.poll();
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava2.AbstractRxJava2SubscriptionTest
import io.opentelemetry.instrumentation.rxjava2.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class RxJava2SubscriptionTest extends AbstractRxJava2SubscriptionTest implements LibraryTestTrait {
def setupSpec() {
TracingAssembly.enable()
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava2.AbstractRxJava2Test
import io.opentelemetry.instrumentation.rxjava2.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class RxJava2Test extends AbstractRxJava2Test implements LibraryTestTrait {
def setupSpec() {
TracingAssembly.enable()
}
}

View File

@ -0,0 +1,13 @@
apply from: "$rootDir/gradle/java.gradle"
dependencies {
api project(':testing-common')
api group: 'io.reactivex.rxjava2', name: 'rxjava', version: "2.1.3"
implementation deps.guava
implementation deps.groovy
implementation deps.opentelemetryApi
implementation deps.spock
}

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava2
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.Single
import io.reactivex.functions.Consumer
import java.util.concurrent.CountDownLatch
abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification {
def "subscription test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Single<Connection> connection = Single.create {
it.onSuccess(new Connection())
}
connection.subscribe(new Consumer<Connection>() {
@Override
void accept(Connection t) {
t.query()
latch.countDown()
}
})
}
latch.await()
then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "Connection.query", span(0))
}
}
}
static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
span.end()
return new Random().nextInt()
}
}
}

View File

@ -0,0 +1,370 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava2
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTraceWithoutExceptionCatch
import static java.util.concurrent.TimeUnit.MILLISECONDS
import com.google.common.collect.Lists
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.BackpressureStrategy
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.internal.operators.flowable.FlowablePublish
import io.reactivex.internal.operators.observable.ObservablePublish
import io.reactivex.schedulers.Schedulers
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import spock.lang.Shared
/**
* <p>Tests in this class may seem not exhaustive due to the fact that some classes are converted
* into others, ie. {@link Completable#toMaybe()}. Fortunately, RxJava2 uses helper classes like
* {@link io.reactivex.internal.operators.maybe.MaybeFromCompletable} and as a result we
* can test subscriptions and cancellations correctly.
*/
abstract class AbstractRxJava2Test extends InstrumentationSpecification {
public static final String EXCEPTION_MESSAGE = "test exception"
@Shared
def addOne = { i ->
addOneFunc(i)
}
@Shared
def addTwo = { i ->
addTwoFunc(i)
}
@Shared
def throwException = {
throw new RuntimeException(EXCEPTION_MESSAGE)
}
static addOneFunc(int i) {
runUnderTrace("addOne") {
return i + 1
}
}
static addTwoFunc(int i) {
runUnderTrace("addTwo") {
return i + 2
}
}
def "Publisher '#name' test"() {
when:
def result = assemblePublisherUnderTrace(publisherSupplier)
then:
result == expected
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, workSpans + 1) {
basicSpan(it, 0, "publisher-parent")
for (int i = 1; i < workSpans + 1; ++i) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
name | expected | workSpans | publisherSupplier
"basic maybe" | 2 | 1 | { -> Maybe.just(1).map(addOne) }
"two operations maybe" | 4 | 2 | { -> Maybe.just(2).map(addOne).map(addOne) }
"delayed maybe" | 4 | 1 | { ->
Maybe.just(3).delay(100, MILLISECONDS).map(addOne)
}
"delayed twice maybe" | 6 | 2 | { ->
Maybe.just(4).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne)
}
"basic flowable" | [6, 7] | 2 | { ->
Flowable.fromIterable([5, 6]).map(addOne)
}
"two operations flowable" | [8, 9] | 4 | { ->
Flowable.fromIterable([6, 7]).map(addOne).map(addOne)
}
"delayed flowable" | [8, 9] | 2 | { ->
Flowable.fromIterable([7, 8]).delay(100, MILLISECONDS).map(addOne)
}
"delayed twice flowable" | [10, 11] | 4 | { ->
Flowable.fromIterable([8, 9]).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne)
}
"maybe from callable" | 12 | 2 | { ->
Maybe.fromCallable({ addOneFunc(10) }).map(addOne)
}
"basic single" | 1 | 1 | { -> Single.just(0).map(addOne) }
"basic observable" | [1] | 1 | { -> Observable.just(0).map(addOne) }
"connectable flowable" | [1] | 1 | { ->
FlowablePublish.just(0).delay(100, MILLISECONDS).map(addOne)
}
"connectable observable" | [1] | 1 | { ->
ObservablePublish.just(0).delay(100, MILLISECONDS).map(addOne)
}
}
def "Publisher error '#name' test"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
def thrownException = thrown RuntimeException
thrownException.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, 1) {
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 0, "publisher-parent")
}
}
where:
name | publisherSupplier
"maybe" | { -> Maybe.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"flowable" | { -> Flowable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"single" | { -> Single.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"observable" | { -> Observable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"completable" | { -> Completable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
}
def "Publisher step '#name' test"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, workSpans + 1) {
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 0, "publisher-parent")
for (int i = 1; i < workSpans + 1; i++) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
name | workSpans | publisherSupplier
"basic maybe failure" | 1 | { ->
Maybe.just(1).map(addOne).map({ throwException() })
}
"basic flowable failure" | 1 | { ->
Flowable.fromIterable([5, 6]).map(addOne).map({ throwException() })
}
}
def "Publisher '#name' cancel"() {
when:
cancelUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, 1) {
basicSpan(it, 0, "publisher-parent")
}
}
where:
name | publisherSupplier
"basic maybe" | { -> Maybe.just(1) }
"basic flowable" | { -> Flowable.fromIterable([5, 6]) }
"basic single" | { -> Single.just(1) }
"basic completable" | { -> Completable.fromCallable({ -> 1 }) }
"basic observable" | { -> Observable.just(1) }
}
def "Publisher chain spans have the correct parent for '#name'"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, workSpans + 1) {
basicSpan(it, 0, "publisher-parent")
for (int i = 1; i < workSpans + 1; i++) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
name | workSpans | publisherSupplier
"basic maybe" | 3 | { ->
Maybe.just(1).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne))
}
"basic flowable" | 5 | { ->
Flowable.fromIterable([5, 6]).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne).toFlowable())
}
}
def "Publisher chain spans have the correct parents from subscription time"() {
when:
def maybe = Maybe.just(42)
.map(addOne)
.map(addTwo)
runUnderTrace("trace-parent") {
maybe.blockingGet()
}
then:
assertTraces(1) {
trace(0, 3) {
sortSpansByStartTime()
basicSpan(it, 0, "trace-parent")
basicSpan(it, 1, "addOne", span(0))
basicSpan(it, 2, "addTwo", span(0))
}
}
}
def "Publisher chain spans have the correct parents from subscription time '#name'"() {
when:
assemblePublisherUnderTrace {
// The "add one" operations in the publisher created here should be children of the publisher-parent
def publisher = publisherSupplier()
runUnderTrace("intermediate") {
if (publisher instanceof Maybe) {
return ((Maybe) publisher).map(addTwo)
} else if (publisher instanceof Flowable) {
return ((Flowable) publisher).map(addTwo)
} else if (publisher instanceof Single) {
return ((Single) publisher).map(addTwo)
} else if (publisher instanceof Observable) {
return ((Observable) publisher).map(addTwo)
} else if (publisher instanceof Completable) {
return ((Completable) publisher).toMaybe().map(addTwo)
}
throw new IllegalStateException("Unknown publisher type")
}
}
then:
assertTraces(1) {
trace(0, 2 + 2 * workItems) {
sortSpansByStartTime()
basicSpan(it, 0, "publisher-parent")
basicSpan(it, 1, "intermediate", span(0))
for (int i = 2; i < 2 + 2 * workItems; i = i + 2) {
basicSpan(it, i, "addOne", span(0))
basicSpan(it, i + 1, "addTwo", span(0))
}
}
}
where:
name | workItems | publisherSupplier
"basic maybe" | 1 | { -> Maybe.just(1).map(addOne) }
"basic flowable" | 2 | { -> Flowable.fromIterable([1, 2]).map(addOne) }
"basic single" | 1 | { -> Single.just(1).map(addOne) }
"basic observable" | 1 | { -> Observable.just(1).map(addOne) }
}
def "Flowables produce the right number of results '#scheduler'"() {
when:
List<String> values = runUnderTrace("flowable root") {
Flowable.fromIterable([1, 2, 3, 4])
.parallel()
.runOn(scheduler)
.flatMap({ num ->
Maybe.just(num).map(addOne).toFlowable()
})
.sequential()
.toList()
.blockingGet()
}
then:
values.size() == 4
assertTraces(1) {
trace(0, 5) {
basicSpan(it, 0, "flowable root")
for (int i = 1; i < values.size() + 1; i++) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
scheduler << [Schedulers.newThread(), Schedulers.computation(), Schedulers.single(), Schedulers.trampoline()]
}
def cancelUnderTrace(def publisherSupplier) {
runUnderTraceWithoutExceptionCatch("publisher-parent") {
def publisher = publisherSupplier()
if (publisher instanceof Maybe) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Single) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Completable) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Observable) {
publisher = publisher.toFlowable(BackpressureStrategy.LATEST)
}
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {
subscription.cancel()
}
void onNext(Integer t) {
}
void onError(Throwable error) {
}
void onComplete() {
}
})
}
}
@SuppressWarnings("unchecked")
def assemblePublisherUnderTrace(def publisherSupplier) {
// The "add two" operations below should be children of this span
runUnderTraceWithoutExceptionCatch("publisher-parent") {
def publisher = publisherSupplier()
// Read all data from publisher
if (publisher instanceof Maybe) {
return ((Maybe) publisher).blockingGet()
} else if (publisher instanceof Flowable) {
return Lists.newArrayList(((Flowable) publisher).blockingIterable())
} else if (publisher instanceof Single) {
return ((Single) publisher).blockingGet()
} else if (publisher instanceof Observable) {
return Lists.newArrayList(((Observable) publisher).blockingIterable())
} else if (publisher instanceof Completable) {
return ((Completable) publisher).toMaybe().blockingGet()
}
throw new RuntimeException("Unknown publisher: " + publisher)
}
}
}

View File

@ -195,6 +195,8 @@ include ':instrumentation:redisson-3.0:javaagent'
include ':instrumentation:rmi:javaagent'
include ':instrumentation:runtime-metrics:library'
include ':instrumentation:rxjava-1.0:library'
include ':instrumentation:rxjava-2.0:library'
include ':instrumentation:rxjava-2.0:testing'
include ':instrumentation:scala-executors:javaagent'
include ':instrumentation:servlet:glassfish-testing'
include ':instrumentation:servlet:servlet-common:javaagent'

View File

@ -11,12 +11,13 @@ import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.function.Supplier
import org.codehaus.groovy.runtime.powerassert.PowerAssertionError
import org.spockframework.runtime.Condition
import org.spockframework.runtime.ConditionNotSatisfiedError
import org.spockframework.runtime.model.TextPosition
import java.util.function.Supplier
class InMemoryExporterAssert {
private final List<List<SpanData>> traces
private final Supplier<List<SpanData>> spanSupplier
@ -82,4 +83,12 @@ class InMemoryExporterAssert {
void assertTracesAllVerified() {
assert assertedIndexes.size() == traces.size()
}
void sortSpansByStartTime() {
traces.each {
it.sort { a, b ->
return a.startEpochNanos <=> b.startEpochNanos
}
}
}
}

View File

@ -16,6 +16,7 @@ import io.opentelemetry.instrumentation.test.asserts.AttributesAssert
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.instrumentation.test.server.ServerTraceUtils
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.concurrent.Callable
import java.util.concurrent.ExecutionException
@ -67,9 +68,21 @@ class TraceUtils {
errorEvent(exception.class, exception.message)
}
if(additionAttributesAssert != null){
if (additionAttributesAssert != null) {
attributes(additionAttributesAssert)
}
}
}
static <T> T runUnderTraceWithoutExceptionCatch(final String rootOperationName, final Callable<T> r) {
final Span span = tracer.spanBuilder(rootOperationName).setSpanKind(SpanKind.INTERNAL).startSpan()
try {
return span.makeCurrent().withCloseable {
r.call()
}
} finally {
span.end()
}
}
}