Add instrumentation for RxJava 3 (#2794)

This commit is contained in:
HaloFour 2021-04-14 15:24:13 -04:00 committed by GitHub
parent 2d59d25961
commit 437547d949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3174 additions and 0 deletions

View File

@ -0,0 +1,19 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
muzzle {
pass {
group = "io.reactivex.rxjava3"
module = "rxjava"
versions = "[3.0.0,)"
assertInverse true
}
}
dependencies {
library group: 'io.reactivex.rxjava3', name: 'rxjava', version: "3.0.0"
implementation project(":instrumentation:rxjava:rxjava-3.0:library")
testImplementation deps.opentelemetryExtAnnotations
testImplementation project(':instrumentation:rxjava:rxjava-3.0:testing')
}

View File

@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava3;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(InstrumentationModule.class)
public class RxJava3InstrumentationModule extends InstrumentationModule {
public RxJava3InstrumentationModule() {
super("rxjava3");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new PluginInstrumentation());
}
public static class PluginInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.reactivex.rxjava3.plugins.RxJavaPlugins");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return Collections.singletonMap(
isMethod(), RxJava3InstrumentationModule.class.getName() + "$RxJavaPluginsAdvice");
}
}
public static class RxJavaPluginsAdvice {
// TODO(anuraaga): Replace with adding a type initializer to RxJavaPlugins
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2685
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void activateOncePerClassloader() {
TracingAssemblyActivation.activate(RxJavaPlugins.class);
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava3;
import java.util.concurrent.atomic.AtomicBoolean;
public final class TracingAssemblyActivation {
private static final ClassValue<AtomicBoolean> activated =
new ClassValue<AtomicBoolean>() {
@Override
protected AtomicBoolean computeValue(Class<?> type) {
return new AtomicBoolean();
}
};
public static void activate(Class<?> clz) {
if (activated.get(clz).compareAndSet(false, true)) {
TracingAssembly.enable();
}
}
private TracingAssemblyActivation() {}
}

View File

@ -0,0 +1,11 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements AgentTestTrait {
}

View File

@ -0,0 +1,10 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test
import io.opentelemetry.instrumentation.test.AgentTestTrait
class RxJava3Test extends AbstractRxJava3Test implements AgentTestTrait {
}

View File

@ -0,0 +1,841 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.rxjava3.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.observers.TestObserver
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.subjects.CompletableSubject
import io.reactivex.rxjava3.subjects.MaybeSubject
import io.reactivex.rxjava3.subjects.SingleSubject
import io.reactivex.rxjava3.subjects.UnicastSubject
import io.reactivex.rxjava3.subscribers.TestSubscriber
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecification {
def "should capture span for already completed Completable"() {
setup:
def observer = new TestObserver()
def source = Completable.complete()
new TracedWithSpan()
.completable(source)
.subscribe(observer)
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Completable"() {
setup:
def source = CompletableSubject.create()
def observer = new TestObserver()
new TracedWithSpan()
.completable(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onComplete()
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Completable"() {
setup:
def error = new IllegalArgumentException("Boom")
def observer = new TestObserver()
def source = Completable.error(error)
new TracedWithSpan()
.completable(source)
.subscribe(observer)
observer.assertError(error)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Completable"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = CompletableSubject.create()
def observer = new TestObserver()
new TracedWithSpan()
.completable(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for already completed Maybe"() {
setup:
def observer = new TestObserver()
def source = Maybe.just("Value")
new TracedWithSpan()
.maybe(source)
.subscribe(observer)
observer.assertValue("Value")
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.maybe"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already empty Maybe"() {
setup:
def observer = new TestObserver()
def source = Maybe.<String>empty()
new TracedWithSpan()
.maybe(source)
.subscribe(observer)
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.maybe"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Maybe"() {
setup:
def source = MaybeSubject.<String>create()
def observer = new TestObserver()
new TracedWithSpan()
.maybe(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onSuccess("Value")
observer.assertValue("Value")
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.maybe"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Maybe"() {
setup:
def error = new IllegalArgumentException("Boom")
def observer = new TestObserver()
def source = Maybe.<String>error(error)
new TracedWithSpan()
.maybe(source)
.subscribe(observer)
observer.assertError(error)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.maybe"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Maybe"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = MaybeSubject.<String>create()
def observer = new TestObserver()
new TracedWithSpan()
.maybe(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.maybe"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for already completed Single"() {
setup:
def observer = new TestObserver()
def source = Single.just("Value")
new TracedWithSpan()
.single(source)
.subscribe(observer)
observer.assertValue("Value")
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.single"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Single"() {
setup:
def source = SingleSubject.<String>create()
def observer = new TestObserver()
new TracedWithSpan()
.single(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onSuccess("Value")
observer.assertValue("Value")
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.single"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Single"() {
setup:
def error = new IllegalArgumentException("Boom")
def observer = new TestObserver()
def source = Single.<String>error(error)
new TracedWithSpan()
.single(source)
.subscribe(observer)
observer.assertError(error)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.single"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Single"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = SingleSubject.<String>create()
def observer = new TestObserver()
new TracedWithSpan()
.single(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.single"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for already completed Observable"() {
setup:
def observer = new TestObserver()
def source = Observable.<String>just("Value")
new TracedWithSpan()
.observable(source)
.subscribe(observer)
observer.assertValue("Value")
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.observable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Observable"() {
setup:
def source = UnicastSubject.<String>create()
def observer = new TestObserver()
new TracedWithSpan()
.observable(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
observer.assertValue("Value")
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onComplete()
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.observable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Observable"() {
setup:
def error = new IllegalArgumentException("Boom")
def observer = new TestObserver()
def source = Observable.<String>error(error)
new TracedWithSpan()
.observable(source)
.subscribe(observer)
observer.assertError(error)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.observable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Observable"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastSubject.<String>create()
def observer = new TestObserver()
new TracedWithSpan()
.observable(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
observer.assertValue("Value")
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.observable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for already completed Flowable"() {
setup:
def observer = new TestSubscriber()
def source = Flowable.<String>just("Value")
new TracedWithSpan()
.flowable(source)
.subscribe(observer)
observer.assertValue("Value")
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flowable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed Flowable"() {
setup:
def source = UnicastProcessor.<String>create()
def observer = new TestSubscriber()
new TracedWithSpan()
.flowable(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
observer.assertValue("Value")
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onComplete()
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flowable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored Flowable"() {
setup:
def error = new IllegalArgumentException("Boom")
def observer = new TestSubscriber()
def source = Flowable.<String>error(error)
new TracedWithSpan()
.flowable(source)
.subscribe(observer)
observer.assertError(error)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flowable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored Flowable"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def observer = new TestSubscriber()
new TracedWithSpan()
.flowable(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
observer.assertValue("Value")
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flowable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for already completed ParallelFlowable"() {
setup:
def observer = new TestSubscriber()
def source = Flowable.<String>just("Value")
new TracedWithSpan()
.parallelFlowable(source.parallel())
.sequential()
.subscribe(observer)
observer.assertValue("Value")
observer.assertComplete()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.parallelFlowable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed ParallelFlowable"() {
setup:
def source = UnicastProcessor.<String>create()
def observer = new TestSubscriber()
new TracedWithSpan()
.parallelFlowable(source.parallel())
.sequential()
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
observer.assertValue("Value")
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onComplete()
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.parallelFlowable"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already errored ParallelFlowable"() {
setup:
def error = new IllegalArgumentException("Boom")
def observer = new TestSubscriber()
def source = Flowable.<String>error(error)
new TracedWithSpan()
.parallelFlowable(source.parallel())
.sequential()
.subscribe(observer)
observer.assertError(error)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.parallelFlowable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually errored ParallelFlowable"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def observer = new TestSubscriber()
new TracedWithSpan()
.parallelFlowable(source.parallel())
.sequential()
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onNext("Value")
observer.assertValue("Value")
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.parallelFlowable"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually completed Publisher"() {
setup:
def source = new CustomPublisher()
def observer = new TestSubscriber()
new TracedWithSpan()
.publisher(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onComplete()
observer.assertComplete()
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.publisher"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually errored Publisher"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = new CustomPublisher()
def observer = new TestSubscriber()
new TracedWithSpan()
.publisher(source)
.subscribe(observer)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
source.onError(error)
observer.assertError(error)
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.publisher"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
static class CustomPublisher implements Publisher<String>, Subscription {
Subscriber<? super String> subscriber
@Override
void subscribe(Subscriber<? super String> subscriber) {
this.subscriber = subscriber
subscriber.onSubscribe(this)
}
void onComplete() {
this.subscriber.onComplete()
}
void onError(Throwable exception) {
this.subscriber.onError(exception)
}
@Override
void request(long l) { }
@Override
void cancel() { }
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.extension.annotations.WithSpan;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import org.reactivestreams.Publisher;
public class TracedWithSpan {
@WithSpan
public Completable completable(Completable source) {
return source;
}
@WithSpan
public Maybe<String> maybe(Maybe<String> source) {
return source;
}
@WithSpan
public Single<String> single(Single<String> source) {
return source;
}
@WithSpan
public Observable<String> observable(Observable<String> source) {
return source;
}
@WithSpan
public Flowable<String> flowable(Flowable<String> source) {
return source;
}
@WithSpan
public ParallelFlowable<String> parallelFlowable(ParallelFlowable<String> source) {
return source;
}
@WithSpan
public Publisher<String> publisher(Publisher<String> source) {
return source;
}
}

View File

@ -0,0 +1,7 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"
dependencies {
library group: 'io.reactivex.rxjava3', name: 'rxjava', version: "3.0.12"
testImplementation project(':instrumentation:rxjava:rxjava-3.0:testing')
}

View File

@ -0,0 +1,135 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
public enum RxJava3AsyncSpanEndStrategy implements AsyncSpanEndStrategy {
INSTANCE;
@Override
public boolean supports(Class<?> returnType) {
return returnType == Publisher.class
|| returnType == Completable.class
|| returnType == Maybe.class
|| returnType == Single.class
|| returnType == Observable.class
|| returnType == Flowable.class
|| returnType == ParallelFlowable.class;
}
@Override
public Object end(BaseTracer tracer, Context context, Object returnValue) {
EndOnFirstNotificationConsumer<?> notificationConsumer =
new EndOnFirstNotificationConsumer<>(tracer, context);
if (returnValue instanceof Completable) {
return endWhenComplete((Completable) returnValue, notificationConsumer);
} else if (returnValue instanceof Maybe) {
return endWhenMaybeComplete((Maybe<?>) returnValue, notificationConsumer);
} else if (returnValue instanceof Single) {
return endWhenSingleComplete((Single<?>) returnValue, notificationConsumer);
} else if (returnValue instanceof Observable) {
return endWhenObservableComplete((Observable<?>) returnValue, notificationConsumer);
} else if (returnValue instanceof ParallelFlowable) {
return endWhenFirstComplete((ParallelFlowable<?>) returnValue, notificationConsumer);
}
return endWhenPublisherComplete((Publisher<?>) returnValue, notificationConsumer);
}
private Completable endWhenComplete(
Completable completable, EndOnFirstNotificationConsumer<?> notificationConsumer) {
return completable.doOnEvent(notificationConsumer);
}
private <T> Maybe<T> endWhenMaybeComplete(
Maybe<T> maybe, EndOnFirstNotificationConsumer<?> notificationConsumer) {
@SuppressWarnings("unchecked")
EndOnFirstNotificationConsumer<T> typedConsumer =
(EndOnFirstNotificationConsumer<T>) notificationConsumer;
return maybe.doOnEvent(typedConsumer);
}
private <T> Single<T> endWhenSingleComplete(
Single<T> single, EndOnFirstNotificationConsumer<?> notificationConsumer) {
@SuppressWarnings("unchecked")
EndOnFirstNotificationConsumer<T> typedConsumer =
(EndOnFirstNotificationConsumer<T>) notificationConsumer;
return single.doOnEvent(typedConsumer);
}
private Observable<?> endWhenObservableComplete(
Observable<?> observable, EndOnFirstNotificationConsumer<?> notificationConsumer) {
return observable.doOnComplete(notificationConsumer).doOnError(notificationConsumer);
}
private ParallelFlowable<?> endWhenFirstComplete(
ParallelFlowable<?> parallelFlowable,
EndOnFirstNotificationConsumer<?> notificationConsumer) {
return parallelFlowable.doOnComplete(notificationConsumer).doOnError(notificationConsumer);
}
private Flowable<?> endWhenPublisherComplete(
Publisher<?> publisher, EndOnFirstNotificationConsumer<?> notificationConsumer) {
return Flowable.fromPublisher(publisher)
.doOnComplete(notificationConsumer)
.doOnError(notificationConsumer);
}
/**
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
* OnError notifications are received. Multiple notifications can happen anytime multiple
* subscribers subscribe to the same publisher.
*/
private static final class EndOnFirstNotificationConsumer<T> extends AtomicBoolean
implements Action, Consumer<Throwable>, BiConsumer<T, Throwable> {
private final BaseTracer tracer;
private final Context context;
public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) {
super(false);
this.tracer = tracer;
this.context = context;
}
@Override
public void run() {
if (compareAndSet(false, true)) {
tracer.end(context);
}
}
@Override
public void accept(Throwable exception) {
if (compareAndSet(false, true)) {
if (exception != null) {
tracer.endExceptionally(context, exception);
} else {
tracer.end(context);
}
}
}
@Override
public void accept(T value, Throwable exception) {
accept(exception);
}
}
}

View File

@ -0,0 +1,279 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeObserver;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.reactivestreams.Subscriber;
/**
* RxJava3 library instrumentation.
*
* <p>In order to enable RxJava3 instrumentation one has to call the {@link
* TracingAssembly#enable()} method.
*
* <p>Instrumentation uses <code>on*Assembly</code> and <code>on*Subscribe</code> RxJavaPlugin hooks
* to wrap RxJava3 classes in their tracing equivalents.
*
* <p>Instrumentation can be disabled by calling the {@link TracingAssembly#disable()} method.
*/
public final class TracingAssembly {
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Observable, ? super Observer, ? extends Observer>
oldOnObservableSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<
? super Completable, ? super CompletableObserver, ? extends CompletableObserver>
oldOnCompletableSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Single, ? super SingleObserver, ? extends SingleObserver>
oldOnSingleSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Maybe, ? super MaybeObserver, ? extends MaybeObserver>
oldOnMaybeSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber>
oldOnFlowableSubscribe;
@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
oldOnParallelAssembly;
@GuardedBy("TracingAssembly.class")
private static boolean enabled;
private TracingAssembly() {}
public static synchronized void enable() {
if (enabled) {
return;
}
enableObservable();
enableCompletable();
enableSingle();
enableMaybe();
enableFlowable();
enableParallel();
enableWithSpanStrategy();
enabled = true;
}
public static synchronized void disable() {
if (!enabled) {
return;
}
disableObservable();
disableCompletable();
disableSingle();
disableMaybe();
disableFlowable();
disableParallel();
disableWithSpanStrategy();
enabled = false;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableParallel() {
oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly();
RxJavaPlugins.setOnParallelAssembly(
compose(
oldOnParallelAssembly,
parallelFlowable -> new TracingParallelFlowable(parallelFlowable, Context.current())));
}
private static void enableCompletable() {
oldOnCompletableSubscribe = RxJavaPlugins.getOnCompletableSubscribe();
RxJavaPlugins.setOnCompletableSubscribe(
biCompose(
oldOnCompletableSubscribe,
(completable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingCompletableObserver(observer, context);
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableFlowable() {
oldOnFlowableSubscribe = RxJavaPlugins.getOnFlowableSubscribe();
RxJavaPlugins.setOnFlowableSubscribe(
biCompose(
oldOnFlowableSubscribe,
(flowable, subscriber) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
if (subscriber instanceof ConditionalSubscriber) {
return new TracingConditionalSubscriber<>(
(ConditionalSubscriber) subscriber, context);
} else {
return new TracingSubscriber<>(subscriber, context);
}
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableObservable() {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe(
biCompose(
oldOnObservableSubscribe,
(observable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingObserver(observer, context);
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableSingle() {
oldOnSingleSubscribe = RxJavaPlugins.getOnSingleSubscribe();
RxJavaPlugins.setOnSingleSubscribe(
biCompose(
oldOnSingleSubscribe,
(single, singleObserver) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingSingleObserver(singleObserver, context);
}
}));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableMaybe() {
oldOnMaybeSubscribe = RxJavaPlugins.getOnMaybeSubscribe();
RxJavaPlugins.setOnMaybeSubscribe(
(BiFunction<? super Maybe, MaybeObserver, ? extends MaybeObserver>)
biCompose(
oldOnMaybeSubscribe,
(BiFunction<Maybe, MaybeObserver, MaybeObserver>)
(maybe, maybeObserver) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingMaybeObserver(maybeObserver, context);
}
}));
}
private static void enableWithSpanStrategy() {
AsyncSpanEndStrategies.getInstance().registerStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE);
}
private static void disableParallel() {
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
oldOnParallelAssembly = null;
}
private static void disableObservable() {
RxJavaPlugins.setOnObservableSubscribe(oldOnObservableSubscribe);
oldOnObservableSubscribe = null;
}
private static void disableCompletable() {
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
oldOnCompletableSubscribe = null;
}
private static void disableFlowable() {
RxJavaPlugins.setOnFlowableSubscribe(oldOnFlowableSubscribe);
oldOnFlowableSubscribe = null;
}
private static void disableSingle() {
RxJavaPlugins.setOnSingleSubscribe(oldOnSingleSubscribe);
oldOnSingleSubscribe = null;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void disableMaybe() {
RxJavaPlugins.setOnMaybeSubscribe(
(BiFunction<? super Maybe, MaybeObserver, ? extends MaybeObserver>) oldOnMaybeSubscribe);
oldOnMaybeSubscribe = null;
}
private static void disableWithSpanStrategy() {
AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE);
}
private static <T> Function<? super T, ? extends T> compose(
Function<? super T, ? extends T> before, Function<? super T, ? extends T> after) {
if (before == null) {
return after;
}
return (T v) -> after.apply(before.apply(v));
}
private static <T, U> BiFunction<? super T, ? super U, ? extends U> biCompose(
BiFunction<? super T, ? super U, ? extends U> before,
BiFunction<? super T, ? super U, ? extends U> after) {
if (before == null) {
return after;
}
return (T v, U u) -> after.apply(v, before.apply(v, u));
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
class TracingCompletableObserver implements CompletableObserver, Disposable {
private final CompletableObserver actual;
private final Context context;
private Disposable disposable;
TracingCompletableObserver(final CompletableObserver actual, final Context context) {
this.actual = actual;
this.context = context;
}
@Override
public void onSubscribe(final Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
actual.onSubscribe(this);
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public void onError(final Throwable e) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(e);
}
}
@Override
public void dispose() {
disposable.dispose();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
import io.reactivex.rxjava3.internal.fuseable.QueueSubscription;
import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber;
class TracingConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
private final Context context;
TracingConditionalSubscriber(
final ConditionalSubscriber<? super T> downstream, final Context context) {
super(downstream);
this.context = context;
}
@Override
public boolean tryOnNext(T t) {
try (Scope ignored = context.makeCurrent()) {
return downstream.tryOnNext(t);
}
}
@Override
public void onNext(T t) {
try (Scope ignored = context.makeCurrent()) {
downstream.onNext(t);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = context.makeCurrent()) {
downstream.onError(t);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
downstream.onComplete();
}
}
@Override
public int requestFusion(int mode) {
final QueueSubscription<T> qs = this.qs;
if (qs != null) {
final int m = qs.requestFusion(mode);
sourceMode = m;
return m;
}
return NONE;
}
@Override
public T poll() throws Throwable {
return qs.poll();
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.core.MaybeObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
class TracingMaybeObserver<T> implements MaybeObserver<T>, Disposable {
private final MaybeObserver<T> actual;
private final Context context;
private Disposable disposable;
TracingMaybeObserver(final MaybeObserver<T> actual, final Context context) {
this.actual = actual;
this.context = context;
}
@Override
public void onSubscribe(final Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
actual.onSubscribe(this);
}
@Override
public void onSuccess(final T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onSuccess(t);
}
}
@Override
public void onError(final Throwable e) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(e);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
actual.onComplete();
}
}
@Override
public void dispose() {
disposable.dispose();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.internal.fuseable.QueueDisposable;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;
class TracingObserver<T> extends BasicFuseableObserver<T, T> {
private final Context context;
TracingObserver(final Observer<? super T> downstream, final Context context) {
super(downstream);
this.context = context;
}
@Override
public void onNext(T t) {
try (Scope ignored = context.makeCurrent()) {
downstream.onNext(t);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = context.makeCurrent()) {
downstream.onError(t);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
downstream.onComplete();
}
}
@Override
public int requestFusion(int mode) {
final QueueDisposable<T> qd = this.qd;
if (qd != null) {
final int m = qd.requestFusion(mode);
sourceMode = m;
return m;
}
return NONE;
}
@Override
public T poll() throws Throwable {
return qd.poll();
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import org.reactivestreams.Subscriber;
class TracingParallelFlowable<T> extends ParallelFlowable<T> {
private final ParallelFlowable<T> source;
private final Context context;
TracingParallelFlowable(final ParallelFlowable<T> source, final Context context) {
this.source = source;
this.context = context;
}
@SuppressWarnings("unchecked")
@Override
public void subscribe(final Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}
final int n = subscribers.length;
final Subscriber<? super T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
final Subscriber<? super T> z = subscribers[i];
if (z instanceof ConditionalSubscriber) {
parents[i] =
new TracingConditionalSubscriber<>((ConditionalSubscriber<? super T>) z, context);
} else {
parents[i] = new TracingSubscriber<>(z, context);
}
}
try (Scope ignored = context.makeCurrent()) {
source.subscribe(parents);
}
}
@Override
public int parallelism() {
return source.parallelism();
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
class TracingSingleObserver<T> implements SingleObserver<T>, Disposable {
private final SingleObserver<T> actual;
private final Context context;
private Disposable disposable;
TracingSingleObserver(final SingleObserver<T> actual, final Context context) {
this.actual = actual;
this.context = context;
}
@Override
public void onSubscribe(final Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
this.disposable = d;
actual.onSubscribe(this);
}
@Override
public void onSuccess(final T t) {
try (Scope ignored = context.makeCurrent()) {
actual.onSuccess(t);
}
}
@Override
public void onError(Throwable throwable) {
try (Scope ignored = context.makeCurrent()) {
actual.onError(throwable);
}
}
@Override
public void dispose() {
disposable.dispose();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.opentelemetry.instrumentation.rxjava3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.internal.fuseable.QueueSubscription;
import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber;
import org.reactivestreams.Subscriber;
class TracingSubscriber<T> extends BasicFuseableSubscriber<T, T> {
private final Context context;
TracingSubscriber(final Subscriber<? super T> downstream, final Context context) {
super(downstream);
this.context = context;
}
@Override
public void onNext(T t) {
try (Scope ignored = context.makeCurrent()) {
downstream.onNext(t);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = context.makeCurrent()) {
downstream.onError(t);
}
}
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
downstream.onComplete();
}
}
@Override
public int requestFusion(int mode) {
final QueueSubscription<T> qs = this.qs;
if (qs != null) {
final int m = qs.requestFusion(mode);
sourceMode = m;
return m;
}
return NONE;
}
@Override
public T poll() throws Throwable {
return qs.poll();
}
}

View File

@ -0,0 +1,731 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.api.tracer.BaseTracer
import io.opentelemetry.instrumentation.rxjava3.RxJava3AsyncSpanEndStrategy
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.observers.TestObserver
import io.reactivex.rxjava3.parallel.ParallelFlowable
import io.reactivex.rxjava3.processors.ReplayProcessor
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.subjects.CompletableSubject
import io.reactivex.rxjava3.subjects.MaybeSubject
import io.reactivex.rxjava3.subjects.ReplaySubject
import io.reactivex.rxjava3.subjects.SingleSubject
import io.reactivex.rxjava3.subjects.UnicastSubject
import io.reactivex.rxjava3.subscribers.TestSubscriber
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import spock.lang.Specification
class RxJava3AsyncSpanEndStrategyTest extends Specification {
BaseTracer tracer
Context context
def underTest = RxJava3AsyncSpanEndStrategy.INSTANCE
void setup() {
tracer = Mock()
context = Mock()
}
static class CompletableTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Completable)
}
def "ends span on already completed"() {
given:
def observer = new TestObserver()
when:
def result = (Completable) underTest.end(tracer, context, Completable.complete())
result.subscribe(observer)
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
def observer = new TestObserver()
when:
def result = (Completable) underTest.end(tracer, context, Completable.error(exception))
result.subscribe(observer)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span when completed"() {
given:
def source = CompletableSubject.create()
def observer = new TestObserver()
when:
def result = (Completable) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = CompletableSubject.create()
def observer = new TestObserver()
when:
def result = (Completable) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span once for multiple subscribers"() {
given:
def source = CompletableSubject.create()
def observer1 = new TestObserver()
def observer2 = new TestObserver()
def observer3 = new TestObserver()
when:
def result = (Completable) underTest.end(tracer, context, source)
result.subscribe(observer1)
result.subscribe(observer2)
result.subscribe(observer3)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer1.assertComplete()
observer2.assertComplete()
observer3.assertComplete()
}
}
static class MaybeTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Maybe)
}
def "ends span on already completed"() {
given:
def observer = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, Maybe.just("Value"))
result.subscribe(observer)
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span on already empty"() {
given:
def observer = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, Maybe.empty())
result.subscribe(observer)
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
def observer = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, Maybe.error(exception))
result.subscribe(observer)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span when completed"() {
given:
def source = MaybeSubject.create()
def observer = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onSuccess("Value")
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when empty"() {
given:
def source = MaybeSubject.create()
def observer = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = MaybeSubject.create()
def observer = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span once for multiple subscribers"() {
given:
def source = MaybeSubject.create()
def observer1 = new TestObserver()
def observer2 = new TestObserver()
def observer3 = new TestObserver()
when:
def result = (Maybe<?>) underTest.end(tracer, context, source)
result.subscribe(observer1)
result.subscribe(observer2)
result.subscribe(observer3)
then:
0 * tracer._
when:
source.onSuccess("Value")
then:
1 * tracer.end(context)
observer1.assertValue("Value")
observer1.assertComplete()
observer2.assertValue("Value")
observer2.assertComplete()
observer3.assertValue("Value")
observer3.assertComplete()
}
}
static class SingleTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Single)
}
def "ends span on already completed"() {
given:
def observer = new TestObserver()
when:
def result = (Single<?>) underTest.end(tracer, context, Single.just("Value"))
result.subscribe(observer)
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
def observer = new TestObserver()
when:
def result = (Single<?>) underTest.end(tracer, context, Single.error(exception))
result.subscribe(observer)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span when completed"() {
given:
def source = SingleSubject.create()
def observer = new TestObserver()
when:
def result = (Single<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onSuccess("Value")
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = SingleSubject.create()
def observer = new TestObserver()
when:
def result = (Single<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span once for multiple subscribers"() {
given:
def source = SingleSubject.create()
def observer1 = new TestObserver()
def observer2 = new TestObserver()
def observer3 = new TestObserver()
when:
def result = (Single<?>) underTest.end(tracer, context, source)
result.subscribe(observer1)
result.subscribe(observer2)
result.subscribe(observer3)
then:
0 * tracer._
when:
source.onSuccess("Value")
then:
1 * tracer.end(context)
observer1.assertValue("Value")
observer1.assertComplete()
observer2.assertValue("Value")
observer2.assertComplete()
observer3.assertValue("Value")
observer3.assertComplete()
}
}
static class ObservableTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Observable)
}
def "ends span on already completed"() {
given:
def observer = new TestObserver()
when:
def result = (Observable<?>) underTest.end(tracer, context, Observable.just("Value"))
result.subscribe(observer)
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
def observer = new TestObserver()
when:
def result = (Observable<?>) underTest.end(tracer, context, Observable.error(exception))
result.subscribe(observer)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span when completed"() {
given:
def source = UnicastSubject.create()
def observer = new TestObserver()
when:
def result = (Observable<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = UnicastSubject.create()
def observer = new TestObserver()
when:
def result = (Observable<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span once for multiple subscribers"() {
given:
def source = ReplaySubject.create()
def observer1 = new TestObserver()
def observer2 = new TestObserver()
def observer3 = new TestObserver()
when:
def result = (Observable<?>) underTest.end(tracer, context, source)
result.subscribe(observer1)
result.subscribe(observer2)
result.subscribe(observer3)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer1.assertComplete()
observer2.assertComplete()
observer3.assertComplete()
}
}
static class FlowableTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Flowable)
}
def "ends span on already completed"() {
given:
def observer = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, Flowable.just("Value"))
result.subscribe(observer)
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
def observer = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, Flowable.error(exception))
result.subscribe(observer)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span when completed"() {
given:
def source = UnicastProcessor.create()
def observer = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = UnicastProcessor.create()
def observer = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
def "ends span once for multiple subscribers"() {
given:
def source = ReplayProcessor.create()
def observer1 = new TestSubscriber()
def observer2 = new TestSubscriber()
def observer3 = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, source)
result.subscribe(observer1)
result.subscribe(observer2)
result.subscribe(observer3)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer1.assertComplete()
observer2.assertComplete()
observer3.assertComplete()
}
}
static class ParallelFlowableTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(ParallelFlowable)
}
def "ends span on already completed"() {
given:
def observer = new TestSubscriber()
when:
def result = (ParallelFlowable<?>) underTest.end(tracer, context, Flowable.just("Value").parallel())
result.sequential().subscribe(observer)
then:
observer.assertComplete()
1 * tracer.end(context)
}
def "ends span on already errored"() {
given:
def exception = new IllegalStateException()
def observer = new TestSubscriber()
when:
def result = (ParallelFlowable<?>) underTest.end(tracer, context, Flowable.error(exception).parallel())
result.sequential().subscribe(observer)
then:
observer.assertError(exception)
1 * tracer.endExceptionally(context, exception)
}
def "ends span when completed"() {
given:
def source = UnicastProcessor.create()
def observer = new TestSubscriber()
when:
def result = (ParallelFlowable<?>) underTest.end(tracer, context, source.parallel())
result.sequential().subscribe(observer)
then:
0 * tracer._
when:
source.onComplete()
then:
observer.assertComplete()
1 * tracer.end(context)
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = UnicastProcessor.create()
def observer = new TestSubscriber()
when:
def result = (ParallelFlowable<?>) underTest.end(tracer, context, source.parallel())
result.sequential().subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
observer.assertError(exception)
1 * tracer.endExceptionally(context, exception)
}
}
static class PublisherTest extends RxJava3AsyncSpanEndStrategyTest {
def "is supported"() {
expect:
underTest.supports(Publisher)
}
def "ends span when completed"() {
given:
def source = new CustomPublisher()
def observer = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onComplete()
then:
1 * tracer.end(context)
observer.assertComplete()
}
def "ends span when errored"() {
given:
def exception = new IllegalStateException()
def source = new CustomPublisher()
def observer = new TestSubscriber()
when:
def result = (Flowable<?>) underTest.end(tracer, context, source)
result.subscribe(observer)
then:
0 * tracer._
when:
source.onError(exception)
then:
1 * tracer.endExceptionally(context, exception)
observer.assertError(exception)
}
}
static class CustomPublisher implements Publisher<String>, Subscription {
Subscriber<? super String> subscriber
@Override
void subscribe(Subscriber<? super String> subscriber) {
this.subscriber = subscriber
subscriber.onSubscribe(this)
}
def onComplete() {
this.subscriber.onComplete()
}
def onError(Throwable exception) {
this.subscriber.onError(exception)
}
@Override
void request(long l) { }
@Override
void cancel() { }
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest
import io.opentelemetry.instrumentation.rxjava3.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait {
def setupSpec() {
TracingAssembly.enable()
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test
import io.opentelemetry.instrumentation.rxjava3.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait {
def setupSpec() {
TracingAssembly.enable()
}
}

View File

@ -0,0 +1,13 @@
apply from: "$rootDir/gradle/java.gradle"
dependencies {
api project(':testing-common')
api group: 'io.reactivex.rxjava3', name: 'rxjava', version: "3.0.12"
implementation deps.guava
implementation deps.groovy
implementation deps.opentelemetryApi
implementation deps.spock
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava3
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.functions.Consumer
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import java.util.concurrent.CountDownLatch
abstract class AbstractRxJava3SubscriptionTest extends InstrumentationSpecification {
def "subscription test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Single<Connection> connection = Single.create {
it.onSuccess(new Connection())
}
connection.subscribe(new Consumer<Connection>() {
@Override
void accept(Connection t) {
t.query()
latch.countDown()
}
})
}
latch.await()
then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "Connection.query", span(0))
}
}
}
static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
span.end()
return new Random().nextInt()
}
}
}

View File

@ -0,0 +1,371 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava3
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish
import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish
import io.reactivex.rxjava3.schedulers.Schedulers
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTraceWithoutExceptionCatch
import static java.util.concurrent.TimeUnit.MILLISECONDS
import com.google.common.collect.Lists
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import spock.lang.Shared
/**
* <p>Tests in this class may seem not exhaustive due to the fact that some classes are converted
* into others, ie. {@link Completable#toMaybe()}. Fortunately, RxJava3 uses helper classes like
* {@link io.reactivex.rxjava3.internal.operators.maybe.MaybeFromCompletable} and as a result we
* can test subscriptions and cancellations correctly.
*/
abstract class AbstractRxJava3Test extends InstrumentationSpecification {
public static final String EXCEPTION_MESSAGE = "test exception"
@Shared
def addOne = { i ->
addOneFunc(i)
}
@Shared
def addTwo = { i ->
addTwoFunc(i)
}
@Shared
def throwException = {
throw new RuntimeException(EXCEPTION_MESSAGE)
}
static addOneFunc(int i) {
runUnderTrace("addOne") {
return i + 1
}
}
static addTwoFunc(int i) {
runUnderTrace("addTwo") {
return i + 2
}
}
def "Publisher '#name' test"() {
when:
def result = assemblePublisherUnderTrace(publisherSupplier)
then:
result == expected
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, workSpans + 1) {
basicSpan(it, 0, "publisher-parent")
for (int i = 1; i < workSpans + 1; ++i) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
name | expected | workSpans | publisherSupplier
"basic maybe" | 2 | 1 | { -> Maybe.just(1).map(addOne) }
"two operations maybe" | 4 | 2 | { -> Maybe.just(2).map(addOne).map(addOne) }
"delayed maybe" | 4 | 1 | { ->
Maybe.just(3).delay(100, MILLISECONDS).map(addOne)
}
"delayed twice maybe" | 6 | 2 | { ->
Maybe.just(4).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne)
}
"basic flowable" | [6, 7] | 2 | { ->
Flowable.fromIterable([5, 6]).map(addOne)
}
"two operations flowable" | [8, 9] | 4 | { ->
Flowable.fromIterable([6, 7]).map(addOne).map(addOne)
}
"delayed flowable" | [8, 9] | 2 | { ->
Flowable.fromIterable([7, 8]).delay(100, MILLISECONDS).map(addOne)
}
"delayed twice flowable" | [10, 11] | 4 | { ->
Flowable.fromIterable([8, 9]).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne)
}
"maybe from callable" | 12 | 2 | { ->
Maybe.fromCallable({ addOneFunc(10) }).map(addOne)
}
"basic single" | 1 | 1 | { -> Single.just(0).map(addOne) }
"basic observable" | [1] | 1 | { -> Observable.just(0).map(addOne) }
"connectable flowable" | [1] | 1 | { ->
FlowablePublish.just(0).delay(100, MILLISECONDS).map(addOne)
}
"connectable observable" | [1] | 1 | { ->
ObservablePublish.just(0).delay(100, MILLISECONDS).map(addOne)
}
}
def "Publisher error '#name' test"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
def thrownException = thrown RuntimeException
thrownException.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, 1) {
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 0, "publisher-parent")
}
}
where:
name | publisherSupplier
"maybe" | { -> Maybe.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"flowable" | { -> Flowable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"single" | { -> Single.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"observable" | { -> Observable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"completable" | { -> Completable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
}
def "Publisher step '#name' test"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, workSpans + 1) {
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 0, "publisher-parent")
for (int i = 1; i < workSpans + 1; i++) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
name | workSpans | publisherSupplier
"basic maybe failure" | 1 | { ->
Maybe.just(1).map(addOne).map({ throwException() })
}
"basic flowable failure" | 1 | { ->
Flowable.fromIterable([5, 6]).map(addOne).map({ throwException() })
}
}
def "Publisher '#name' cancel"() {
when:
cancelUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, 1) {
basicSpan(it, 0, "publisher-parent")
}
}
where:
name | publisherSupplier
"basic maybe" | { -> Maybe.just(1) }
"basic flowable" | { -> Flowable.fromIterable([5, 6]) }
"basic single" | { -> Single.just(1) }
"basic completable" | { -> Completable.fromCallable({ -> 1 }) }
"basic observable" | { -> Observable.just(1) }
}
def "Publisher chain spans have the correct parent for '#name'"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, workSpans + 1) {
basicSpan(it, 0, "publisher-parent")
for (int i = 1; i < workSpans + 1; i++) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
name | workSpans | publisherSupplier
"basic maybe" | 3 | { ->
Maybe.just(1).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne))
}
"basic flowable" | 5 | { ->
Flowable.fromIterable([5, 6]).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne).toFlowable())
}
}
def "Publisher chain spans have the correct parents from subscription time"() {
when:
def maybe = Maybe.just(42)
.map(addOne)
.map(addTwo)
runUnderTrace("trace-parent") {
maybe.blockingGet()
}
then:
assertTraces(1) {
trace(0, 3) {
sortSpansByStartTime()
basicSpan(it, 0, "trace-parent")
basicSpan(it, 1, "addOne", span(0))
basicSpan(it, 2, "addTwo", span(0))
}
}
}
def "Publisher chain spans have the correct parents from subscription time '#name'"() {
when:
assemblePublisherUnderTrace {
// The "add one" operations in the publisher created here should be children of the publisher-parent
def publisher = publisherSupplier()
runUnderTrace("intermediate") {
if (publisher instanceof Maybe) {
return ((Maybe) publisher).map(addTwo)
} else if (publisher instanceof Flowable) {
return ((Flowable) publisher).map(addTwo)
} else if (publisher instanceof Single) {
return ((Single) publisher).map(addTwo)
} else if (publisher instanceof Observable) {
return ((Observable) publisher).map(addTwo)
} else if (publisher instanceof Completable) {
return ((Completable) publisher).toMaybe().map(addTwo)
}
throw new IllegalStateException("Unknown publisher type")
}
}
then:
assertTraces(1) {
trace(0, 2 + 2 * workItems) {
sortSpansByStartTime()
basicSpan(it, 0, "publisher-parent")
basicSpan(it, 1, "intermediate", span(0))
for (int i = 2; i < 2 + 2 * workItems; i = i + 2) {
basicSpan(it, i, "addOne", span(0))
basicSpan(it, i + 1, "addTwo", span(0))
}
}
}
where:
name | workItems | publisherSupplier
"basic maybe" | 1 | { -> Maybe.just(1).map(addOne) }
"basic flowable" | 2 | { -> Flowable.fromIterable([1, 2]).map(addOne) }
"basic single" | 1 | { -> Single.just(1).map(addOne) }
"basic observable" | 1 | { -> Observable.just(1).map(addOne) }
}
def "Flowables produce the right number of results '#scheduler'"() {
when:
List<String> values = runUnderTrace("flowable root") {
Flowable.fromIterable([1, 2, 3, 4])
.parallel()
.runOn(scheduler)
.flatMap({ num ->
Maybe.just(num).map(addOne).toFlowable()
})
.sequential()
.toList()
.blockingGet()
}
then:
values.size() == 4
assertTraces(1) {
trace(0, 5) {
basicSpan(it, 0, "flowable root")
for (int i = 1; i < values.size() + 1; i++) {
basicSpan(it, i, "addOne", span(0))
}
}
}
where:
scheduler << [Schedulers.newThread(), Schedulers.computation(), Schedulers.single(), Schedulers.trampoline()]
}
def cancelUnderTrace(def publisherSupplier) {
runUnderTraceWithoutExceptionCatch("publisher-parent") {
def publisher = publisherSupplier()
if (publisher instanceof Maybe) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Single) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Completable) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Observable) {
publisher = publisher.toFlowable(BackpressureStrategy.LATEST)
}
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {
subscription.cancel()
}
void onNext(Integer t) {
}
void onError(Throwable error) {
}
void onComplete() {
}
})
}
}
@SuppressWarnings("unchecked")
def assemblePublisherUnderTrace(def publisherSupplier) {
// The "add two" operations below should be children of this span
runUnderTraceWithoutExceptionCatch("publisher-parent") {
def publisher = publisherSupplier()
// Read all data from publisher
if (publisher instanceof Maybe) {
return ((Maybe) publisher).blockingGet()
} else if (publisher instanceof Flowable) {
return Lists.newArrayList(((Flowable) publisher).blockingIterable())
} else if (publisher instanceof Single) {
return ((Single) publisher).blockingGet()
} else if (publisher instanceof Observable) {
return Lists.newArrayList(((Observable) publisher).blockingIterable())
} else if (publisher instanceof Completable) {
return ((Completable) publisher).toMaybe().blockingGet()
}
throw new RuntimeException("Unknown publisher: " + publisher)
}
}
}

View File

@ -228,6 +228,9 @@ include ':instrumentation:rxjava:rxjava-1.0:library'
include ':instrumentation:rxjava:rxjava-2.0:library'
include ':instrumentation:rxjava:rxjava-2.0:testing'
include ':instrumentation:rxjava:rxjava-2.0:javaagent'
include ':instrumentation:rxjava:rxjava-3.0:library'
include ':instrumentation:rxjava:rxjava-3.0:testing'
include ':instrumentation:rxjava:rxjava-3.0:javaagent'
include ':instrumentation:scala-executors:javaagent'
include ':instrumentation:servlet:glassfish-testing'
include ':instrumentation:servlet:servlet-common:library'