Migrate rxjava3 unit tests to Java (#7924)

This commit is contained in:
kc 2023-04-21 05:41:20 -05:00 committed by GitHub
parent 999863d39b
commit fd9ef741a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 3156 additions and 2892 deletions

View File

@ -0,0 +1,951 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.rxjava.v3.common.RxJava3AsyncOperationEndStrategy;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.processors.ReplayProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subjects.CompletableSubject;
import io.reactivex.rxjava3.subjects.MaybeSubject;
import io.reactivex.rxjava3.subjects.ReplaySubject;
import io.reactivex.rxjava3.subjects.SingleSubject;
import io.reactivex.rxjava3.subjects.UnicastSubject;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@ExtendWith(MockitoExtension.class)
public class RxJava3AsyncOperationEndStrategyTest {
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
AttributeKey.booleanKey("rxjava.canceled");
@Mock Instrumenter<String, String> instrumenter;
@Mock Span span;
private final AsyncOperationEndStrategy underTest = RxJava3AsyncOperationEndStrategy.create();
private final RxJava3AsyncOperationEndStrategy underTestWithExperimentalAttributes =
RxJava3AsyncOperationEndStrategy.builder().setCaptureExperimentalSpanAttributes(true).build();
@Nested
class CompletableTest {
@Test
void supported() {
assertThat(underTest.supports(Completable.class)).isTrue();
}
@Test
void endsSpanOnAlreadyCompleted() {
Completable result =
(Completable)
underTest.end(
instrumenter, Context.root(), "request", Completable.complete(), String.class);
TestObserver<Void> observer = result.test();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanOnAlreadyErrored() {
IllegalStateException exception = new IllegalStateException();
Completable result =
(Completable)
underTest.end(
instrumenter,
Context.root(),
"request",
Completable.error(exception),
String.class);
TestObserver<Void> observer = result.test();
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCompleted() {
CompletableSubject source = CompletableSubject.create();
Completable result =
(Completable)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<Void> observer = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanWhenErrored() {
IllegalStateException exception = new IllegalStateException();
CompletableSubject source = CompletableSubject.create();
Completable result =
(Completable)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<Void> observer = result.test();
verifyNoInteractions(instrumenter);
source.onError(exception);
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
CompletableSubject source = CompletableSubject.create();
Context context = Context.root().with(span);
Completable result =
(Completable) underTest.end(instrumenter, context, "request", source, String.class);
TestObserver<Void> observer = result.test();
verifyNoInteractions(instrumenter);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanWhenCancelledExperimentalAttribute() {
when(span.storeInContext(any())).thenCallRealMethod();
CompletableSubject source = CompletableSubject.create();
Context context = Context.root().with(span);
Completable result =
(Completable)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source, String.class);
TestObserver<Void> observer = result.test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanOnceForMultipleSubscribers() {
CompletableSubject source = CompletableSubject.create();
TestObserver<String> observer1 = new TestObserver<>();
TestObserver<String> observer2 = new TestObserver<>();
TestObserver<String> observer3 = new TestObserver<>();
Completable result =
(Completable)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
result.subscribe(observer1);
result.subscribe(observer2);
result.subscribe(observer3);
verifyNoInteractions(instrumenter);
source.onComplete();
observer1.assertComplete();
observer2.assertComplete();
observer3.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
}
@Nested
class MaybeTest {
@Test
void supported() {
assertThat(underTest.supports(Maybe.class)).isTrue();
}
@Test
void endsSpanOnAlreadyCompleted() {
Maybe<?> result =
(Maybe<?>)
underTest.end(
instrumenter, Context.root(), "request", Maybe.just("response"), String.class);
TestObserver<?> observer = result.test();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", "response", null);
}
@Test
void endsSpanOnAlreadyEmpty() {
Maybe<?> result =
(Maybe<?>)
underTest.end(instrumenter, Context.root(), "request", Maybe.empty(), String.class);
TestObserver<?> observer = result.test();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanOnAlreadyErrored() {
IllegalStateException exception = new IllegalStateException();
Maybe<?> result =
(Maybe<?>)
underTest.end(
instrumenter, Context.root(), "request", Maybe.error(exception), String.class);
TestObserver<?> observer = result.test();
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCompleted() {
MaybeSubject<String> source = MaybeSubject.create();
Maybe<?> result =
(Maybe<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onSuccess("response");
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", "response", null);
}
@Test
void endsSpanWhenEmpty() {
MaybeSubject<String> source = MaybeSubject.create();
Maybe<?> result =
(Maybe<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanWhenErrored() {
IllegalStateException exception = new IllegalStateException();
MaybeSubject<String> source = MaybeSubject.create();
Maybe<?> result =
(Maybe<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
source.onError(exception);
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
MaybeSubject<String> source = MaybeSubject.create();
Context context = Context.root().with(span);
Maybe<?> result =
(Maybe<?>) underTest.end(instrumenter, context, "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanWhenCancelledExperimentalAttributes() {
when(span.storeInContext(any())).thenCallRealMethod();
MaybeSubject<String> source = MaybeSubject.create();
Context context = Context.root().with(span);
Maybe<?> result =
(Maybe<?>)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanOnceForMultipleSubscribers() {
MaybeSubject<String> source = MaybeSubject.create();
Maybe<?> result =
(Maybe<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer1 = result.test();
TestObserver<?> observer2 = result.test();
TestObserver<?> observer3 = result.test();
verifyNoInteractions(instrumenter);
source.onSuccess("response");
observer1.assertComplete();
observer1.assertValue(value -> value.equals("response"));
observer2.assertComplete();
observer2.assertValue(value -> value.equals("response"));
observer3.assertComplete();
observer3.assertValue(value -> value.equals("response"));
verify(instrumenter).end(Context.root(), "request", "response", null);
}
}
@Nested
class SingleTest {
@Test
void supported() {
assertThat(underTest.supports(Single.class)).isTrue();
}
@Test
void endsSpanOnAlreadyCompleted() {
Single<?> result =
(Single<?>)
underTest.end(
instrumenter, Context.root(), "request", Single.just("response"), String.class);
TestObserver<?> observer = result.test();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", "response", null);
}
@Test
void endsSpanOnAlreadyErrored() {
IllegalStateException exception = new IllegalStateException();
Single<?> result =
(Single<?>)
underTest.end(
instrumenter, Context.root(), "request", Single.error(exception), String.class);
TestObserver<?> observer = result.test();
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCompleted() {
SingleSubject<String> source = SingleSubject.create();
Single<?> result =
(Single<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onSuccess("response");
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", "response", null);
}
@Test
void endsSpanWhenErrored() {
IllegalStateException exception = new IllegalStateException();
SingleSubject<String> source = SingleSubject.create();
Single<?> result =
(Single<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onError(exception);
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
SingleSubject<String> source = SingleSubject.create();
Context context = Context.root().with(span);
Single<?> result =
(Single<?>) underTest.end(instrumenter, context, "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
observer.dispose();
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
@Test
void endsSpanWhenCancelledExperimentalAttributes() {
when(span.storeInContext(any())).thenCallRealMethod();
SingleSubject<String> source = SingleSubject.create();
Context context = Context.root().with(span);
Single<?> result =
(Single<?>)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanOnceForMultipleSubscribers() {
SingleSubject<String> source = SingleSubject.create();
Single<?> result =
(Single<?>) underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer1 = result.test();
TestObserver<?> observer2 = result.test();
TestObserver<?> observer3 = result.test();
verifyNoInteractions(instrumenter);
source.onSuccess("response");
observer1.assertValue(value -> value.equals("response"));
observer1.assertComplete();
observer2.assertValue(value -> value.equals("response"));
observer2.assertComplete();
observer3.assertValue(value -> value.equals("response"));
observer3.assertComplete();
verify(instrumenter).end(Context.root(), "request", "response", null);
}
}
@Nested
class ObservableTest {
@Test
void supported() {
assertThat(underTest.supports(Observable.class)).isTrue();
}
@Test
void endsSpanOnAlreadyCompleted() {
Observable<?> result =
(Observable<?>)
underTest.end(
instrumenter,
Context.root(),
"request",
Observable.just("response"),
String.class);
TestObserver<?> observer = result.test();
verify(instrumenter).end(Context.root(), "request", null, null);
observer.assertComplete();
}
@Test
void endsSpanOnAlreadyErrored() {
IllegalStateException exception = new IllegalStateException();
Observable<?> result =
(Observable<?>)
underTest.end(
instrumenter,
Context.root(),
"request",
Observable.error(exception),
String.class);
TestObserver<?> observer = result.test();
verify(instrumenter).end(Context.root(), "request", null, exception);
observer.assertError(exception);
}
@Test
void endsSpanWhenCompleted() {
UnicastSubject<String> source = UnicastSubject.create();
Observable<?> result =
(Observable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
observer.assertComplete();
}
@Test
void endsSpanWhenErrored() {
IllegalStateException exception = new IllegalStateException();
UnicastSubject<String> source = UnicastSubject.create();
Observable<?> result =
(Observable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
observer.assertError(exception);
}
@Test
void endsOnWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
UnicastSubject<String> source = UnicastSubject.create();
Context context = Context.root().with(span);
Observable<?> result =
(Observable<?>) underTest.end(instrumenter, context, "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanWhenCancelledExperimentalAttributes() {
when(span.storeInContext(any())).thenCallRealMethod();
UnicastSubject<String> source = UnicastSubject.create();
Context context = Context.root().with(span);
Observable<?> result =
(Observable<?>)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source, String.class);
TestObserver<?> observer = result.test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.dispose();
verify(instrumenter).end(context, "request", null, null);
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
@Test
void endsSpanOnceForMultipleSubscribers() {
ReplaySubject<String> source = ReplaySubject.create();
Observable<?> result =
(Observable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestObserver<?> observer1 = result.test();
TestObserver<?> observer2 = result.test();
TestObserver<?> observer3 = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer1.assertComplete();
observer2.assertComplete();
observer3.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
}
@Nested
class FlowableTest {
@Test
void supported() {
assertThat(underTest.supports(Flowable.class)).isTrue();
}
@Test
void endsSpanOnAlreadyCompleted() {
Flowable<?> result =
(Flowable<?>)
underTest.end(
instrumenter, Context.root(), "request", Flowable.just("response"), String.class);
TestSubscriber<?> observer = result.test();
verify(instrumenter).end(Context.root(), "request", null, null);
observer.assertComplete();
}
@Test
void endsOnAlreadyErrored() {
IllegalStateException exception = new IllegalStateException();
Flowable<?> result =
(Flowable<?>)
underTest.end(
instrumenter, Context.root(), "request", Flowable.error(exception), String.class);
TestSubscriber<?> observer = result.test();
verify(instrumenter).end(Context.root(), "request", null, exception);
observer.assertError(exception);
}
@Test
void endsSpanWhenCompleted() {
UnicastProcessor<String> source = UnicastProcessor.create();
Flowable<?> result =
(Flowable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsOnWhenErrored() {
IllegalStateException exception = new IllegalStateException();
UnicastProcessor<String> source = UnicastProcessor.create();
Flowable<?> result =
(Flowable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onError(exception);
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
UnicastProcessor<String> source = UnicastProcessor.create();
Context context = Context.root().with(span);
Flowable<?> result =
(Flowable<?>) underTest.end(instrumenter, context, "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
observer.cancel();
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
@Test
void endsSpanWhenCancelledExperimentalAttributes() {
when(span.storeInContext(any())).thenCallRealMethod();
UnicastProcessor<String> source = UnicastProcessor.create();
Context context = Context.root().with(span);
Flowable<?> result =
(Flowable<?>)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.cancel();
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
@Test
void endsSpanOnceForMultipleSubscribers() {
ReplayProcessor<String> source = ReplayProcessor.create();
Flowable<?> result =
(Flowable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestSubscriber<?> observer1 = result.test();
TestSubscriber<?> observer2 = result.test();
TestSubscriber<?> observer3 = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer1.assertComplete();
observer2.assertComplete();
observer3.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
}
@Nested
class ParallelFlowableTest {
@Test
void supported() {
assertThat(underTest.supports(ParallelFlowable.class)).isTrue();
}
@Test
void endsSpanOnAlreadyCompleted() {
ParallelFlowable<?> result =
(ParallelFlowable<?>)
underTest.end(
instrumenter,
Context.root(),
"request",
Flowable.just("response").parallel(),
String.class);
TestSubscriber<?> observer = result.sequential().test();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanOnAlreadyErrored() {
IllegalStateException exception = new IllegalStateException();
ParallelFlowable<?> result =
(ParallelFlowable<?>)
underTest.end(
instrumenter,
Context.root(),
"request",
Flowable.error(exception).parallel(),
String.class);
TestSubscriber<?> observer = result.sequential().test();
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCompleted() {
UnicastProcessor<String> source = UnicastProcessor.create();
ParallelFlowable<?> result =
(ParallelFlowable<?>)
underTest.end(
instrumenter, Context.root(), "request", source.parallel(), String.class);
TestSubscriber<?> observer = result.sequential().test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanWhenErrored() {
IllegalStateException exception = new IllegalStateException();
UnicastProcessor<String> source = UnicastProcessor.create();
ParallelFlowable<?> result =
(ParallelFlowable<?>)
underTest.end(
instrumenter, Context.root(), "request", source.parallel(), String.class);
TestSubscriber<?> observer = result.sequential().test();
verifyNoInteractions(instrumenter);
source.onError(exception);
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
UnicastProcessor<String> source = UnicastProcessor.create();
Context context = Context.root().with(span);
ParallelFlowable<?> result =
(ParallelFlowable<?>)
underTest.end(instrumenter, context, "request", source.parallel(), String.class);
TestSubscriber<?> observer = result.sequential().test();
verifyNoInteractions(instrumenter);
observer.cancel();
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
@Test
void endsSpanWhenCancelledExperimentalAttributes() {
when(span.storeInContext(any())).thenCallRealMethod();
UnicastProcessor<String> source = UnicastProcessor.create();
Context context = Context.root().with(span);
ParallelFlowable<?> result =
(ParallelFlowable<?>)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source.parallel(), String.class);
TestSubscriber<?> observer = result.sequential().test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.cancel();
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
}
@Nested
class PublisherTest {
@Test
void supported() {
assertThat(underTest.supports(Publisher.class)).isTrue();
}
@Test
void endsSpanWhenCompleted() {
CustomPublisher source = new CustomPublisher();
Flowable<?> result =
(Flowable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onComplete();
observer.assertComplete();
verify(instrumenter).end(Context.root(), "request", null, null);
}
@Test
void endsSpanWhenErrored() {
IllegalStateException exception = new IllegalStateException();
CustomPublisher source = new CustomPublisher();
Flowable<?> result =
(Flowable<?>)
underTest.end(instrumenter, Context.root(), "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
source.onError(exception);
observer.assertError(exception);
verify(instrumenter).end(Context.root(), "request", null, exception);
}
@Test
void endsSpanWhenCancelled() {
when(span.storeInContext(any())).thenCallRealMethod();
CustomPublisher source = new CustomPublisher();
Context context = Context.root().with(span);
Flowable<?> result =
(Flowable<?>) underTest.end(instrumenter, context, "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
observer.cancel();
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
@Test
void endsSpanWhenCancelledExperimentalAttributes() {
when(span.storeInContext(any())).thenCallRealMethod();
CustomPublisher source = new CustomPublisher();
Context context = Context.root().with(span);
Flowable<?> result =
(Flowable<?>)
underTestWithExperimentalAttributes.end(
instrumenter, context, "request", source, String.class);
TestSubscriber<?> observer = result.test();
verifyNoInteractions(instrumenter);
verify(span, never()).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
observer.cancel();
verify(span).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
verify(instrumenter).end(context, "request", null, null);
}
class CustomPublisher implements Publisher<String>, Subscription {
Subscriber<? super String> subscriber;
@Override
public void subscribe(Subscriber<? super String> subscriber) {
this.subscriber = subscriber;
subscriber.onSubscribe(this);
}
public void onComplete() {
this.subscriber.onComplete();
}
public void onError(Throwable exception) {
this.subscriber.onError(exception);
}
@Override
public void request(long l) {}
@Override
public void cancel() {}
}
}
}

View File

@ -1,59 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava.v3.common
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.functions.Consumer
import java.util.concurrent.CountDownLatch
abstract class AbstractRxJava3SubscriptionTest extends InstrumentationSpecification {
def "subscription test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runWithSpan("parent") {
Single<Connection> connection = Single.create {
it.onSuccess(new Connection())
}
connection.subscribe(new Consumer<Connection>() {
@Override
void accept(Connection t) {
t.query()
latch.countDown()
}
})
}
latch.await()
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "Connection.query"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
span.end()
return new Random().nextInt()
}
}
}

View File

@ -1,491 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava.v3.common
import com.google.common.collect.Lists
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish
import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish
import io.reactivex.rxjava3.schedulers.Schedulers
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import spock.lang.Shared
import spock.lang.Unroll
import static java.util.concurrent.TimeUnit.MILLISECONDS
/**
* <p>Tests in this class may seem not exhaustive due to the fact that some classes are converted
* into others, ie. {@link Completable#toMaybe()}. Fortunately, RxJava3 uses helper classes like
* {@link io.reactivex.rxjava3.internal.operators.maybe.MaybeFromCompletable} and as a result we
* can test subscriptions and cancellations correctly.
*/
@Unroll
abstract class AbstractRxJava3Test extends InstrumentationSpecification {
public static final String EXCEPTION_MESSAGE = "test exception"
@Shared
def addOne = { i ->
addOneFunc(i)
}
@Shared
def addTwo = { i ->
addTwoFunc(i)
}
@Shared
def throwException = {
throw new IllegalStateException(EXCEPTION_MESSAGE)
}
def addOneFunc(int i) {
runWithSpan("addOne") {
return i + 1
}
}
def addTwoFunc(int i) {
runWithSpan("addTwo") {
return i + 2
}
}
def "Publisher '#testName' test"() {
when:
def result = assemblePublisherUnderTrace(publisherSupplier)
then:
result == expected
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, workSpans + 1) {
span(0) {
name "publisher-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
for (int i = 1; i < workSpans + 1; ++i) {
span(i) {
name "addOne"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
where:
testName | expected | workSpans | publisherSupplier
"basic maybe" | 2 | 1 | { -> Maybe.just(1).map(addOne) }
"two operations maybe" | 4 | 2 | { -> Maybe.just(2).map(addOne).map(addOne) }
"delayed maybe" | 4 | 1 | { ->
Maybe.just(3).delay(100, MILLISECONDS).map(addOne)
}
"delayed twice maybe" | 6 | 2 | { ->
Maybe.just(4).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne)
}
"basic flowable" | [6, 7] | 2 | { ->
Flowable.fromIterable([5, 6]).map(addOne)
}
"two operations flowable" | [8, 9] | 4 | { ->
Flowable.fromIterable([6, 7]).map(addOne).map(addOne)
}
"delayed flowable" | [8, 9] | 2 | { ->
Flowable.fromIterable([7, 8]).delay(100, MILLISECONDS).map(addOne)
}
"delayed twice flowable" | [10, 11] | 4 | { ->
Flowable.fromIterable([8, 9]).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne)
}
"maybe from callable" | 12 | 2 | { ->
Maybe.fromCallable({ addOneFunc(10) }).map(addOne)
}
"basic single" | 1 | 1 | { -> Single.just(0).map(addOne) }
"basic observable" | [1] | 1 | { -> Observable.just(0).map(addOne) }
"connectable flowable" | [1] | 1 | { ->
FlowablePublish.just(0).delay(100, MILLISECONDS).map(addOne)
}
"connectable observable" | [1] | 1 | { ->
ObservablePublish.just(0).delay(100, MILLISECONDS).map(addOne)
}
}
def "Publisher error '#testName' test"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
def thrownException = thrown RuntimeException
thrownException.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, 1) {
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
span(0) {
name "publisher-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
}
}
where:
testName | publisherSupplier
"maybe" | { -> Maybe.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"flowable" | { -> Flowable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"single" | { -> Single.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"observable" | { -> Observable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"completable" | { -> Completable.error(new RuntimeException(EXCEPTION_MESSAGE)) }
}
def "Publisher step '#testName' test"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
sortSpansByStartTime()
trace(0, workSpans + 1) {
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
span(0) {
name "publisher-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
for (int i = 1; i < workSpans + 1; i++) {
span(i) {
name "addOne"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
where:
testName | workSpans | publisherSupplier
"basic maybe failure" | 1 | { ->
Maybe.just(1).map(addOne).map({ throwException() })
}
"basic flowable failure" | 1 | { ->
Flowable.fromIterable([5, 6]).map(addOne).map({ throwException() })
}
}
def "Publisher '#testName' cancel"() {
when:
cancelUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "publisher-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
}
}
where:
testName | publisherSupplier
"basic maybe" | { -> Maybe.just(1) }
"basic flowable" | { -> Flowable.fromIterable([5, 6]) }
"basic single" | { -> Single.just(1) }
"basic completable" | { -> Completable.fromCallable({ -> 1 }) }
"basic observable" | { -> Observable.just(1) }
}
def "Publisher chain spans have the correct parent for '#testName'"() {
when:
assemblePublisherUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, workSpans + 1) {
span(0) {
name "publisher-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
for (int i = 1; i < workSpans + 1; i++) {
span(i) {
name "addOne"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
where:
testName | workSpans | publisherSupplier
"basic maybe" | 3 | { ->
Maybe.just(1).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne))
}
"basic flowable" | 5 | { ->
Flowable.fromIterable([5, 6]).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne).toFlowable())
}
}
def "Publisher chain spans have the correct parents from subscription time"() {
when:
def maybe = Maybe.just(42)
.map(addOne)
.map(addTwo)
runWithSpan("trace-parent") {
maybe.blockingGet()
}
then:
assertTraces(1) {
trace(0, 3) {
sortSpansByStartTime()
span(0) {
name "trace-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "addOne"
kind SpanKind.INTERNAL
childOf span(0)
}
span(2) {
name "addTwo"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
def "Publisher chain spans have the correct parents from subscription time '#testName'"() {
when:
assemblePublisherUnderTrace {
// The "add one" operations in the publisher created here should be children of the publisher-parent
def publisher = publisherSupplier()
runWithSpan("intermediate") {
if (publisher instanceof Maybe) {
return ((Maybe) publisher).map(addTwo)
} else if (publisher instanceof Flowable) {
return ((Flowable) publisher).map(addTwo)
} else if (publisher instanceof Single) {
return ((Single) publisher).map(addTwo)
} else if (publisher instanceof Observable) {
return ((Observable) publisher).map(addTwo)
} else if (publisher instanceof Completable) {
return ((Completable) publisher).toMaybe().map(addTwo)
}
throw new IllegalStateException("Unknown publisher type")
}
}
then:
assertTraces(1) {
trace(0, 2 + 2 * workItems) {
sortSpansByStartTime()
span(0) {
name "publisher-parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "intermediate"
kind SpanKind.INTERNAL
childOf span(0)
}
for (int i = 2; i < 2 + 2 * workItems; i = i + 2) {
span(i) {
name "addOne"
kind SpanKind.INTERNAL
childOf span(0)
}
span(i + 1) {
name "addTwo"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
where:
testName | workItems | publisherSupplier
"basic maybe" | 1 | { -> Maybe.just(1).map(addOne) }
"basic flowable" | 2 | { -> Flowable.fromIterable([1, 2]).map(addOne) }
"basic single" | 1 | { -> Single.just(1).map(addOne) }
"basic observable" | 1 | { -> Observable.just(1).map(addOne) }
}
def "Flowables produce the right number of results '#scheduler'"() {
when:
List<String> values = runWithSpan("flowable root") {
Flowable.fromIterable([1, 2, 3, 4])
.parallel()
.runOn(scheduler)
.flatMap({ num ->
Maybe.just(num).map(addOne).toFlowable()
})
.sequential()
.toList()
.blockingGet()
}
then:
values.size() == 4
assertTraces(1) {
trace(0, 5) {
span(0) {
name "flowable root"
kind SpanKind.INTERNAL
hasNoParent()
}
for (int i = 1; i < values.size() + 1; i++) {
span(i) {
name "addOne"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
}
where:
scheduler << [Schedulers.newThread(), Schedulers.computation(), Schedulers.single(), Schedulers.trampoline()]
}
def "test many ongoing trace chains on '#scheduler'"() {
setup:
int iterations = 100
Set<Long> remainingIterations = new HashSet<>((0L..(iterations - 1)).toList())
when:
RxJava3ConcurrencyTestHelper.launchAndWait(scheduler, iterations, 60000, testRunner())
then:
assertTraces(iterations) {
for (int i = 0; i < iterations; i++) {
trace(i, 3) {
long iteration = -1
span(0) {
name("outer")
iteration = span.getAttributes().get(AttributeKey.longKey("iteration")).toLong()
assert remainingIterations.remove(iteration)
}
span(1) {
name("middle")
childOf(span(0))
assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
}
span(2) {
name("inner")
childOf(span(1))
assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
}
}
}
}
assert remainingIterations.isEmpty()
where:
scheduler << [Schedulers.newThread(), Schedulers.computation(), Schedulers.single(), Schedulers.trampoline()]
}
def cancelUnderTrace(def publisherSupplier) {
runUnderTraceWithoutExceptionCatch("publisher-parent") {
def publisher = publisherSupplier()
if (publisher instanceof Maybe) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Single) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Completable) {
publisher = publisher.toFlowable()
} else if (publisher instanceof Observable) {
publisher = publisher.toFlowable(BackpressureStrategy.LATEST)
}
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {
subscription.cancel()
}
void onNext(Integer t) {
}
void onError(Throwable error) {
}
void onComplete() {
}
})
}
}
@SuppressWarnings("unchecked")
def assemblePublisherUnderTrace(def publisherSupplier) {
// The "add two" operations below should be children of this span
runUnderTraceWithoutExceptionCatch("publisher-parent") {
def publisher = publisherSupplier()
// Read all data from publisher
if (publisher instanceof Maybe) {
return ((Maybe) publisher).blockingGet()
} else if (publisher instanceof Flowable) {
return Lists.newArrayList(((Flowable) publisher).blockingIterable())
} else if (publisher instanceof Single) {
return ((Single) publisher).blockingGet()
} else if (publisher instanceof Observable) {
return Lists.newArrayList(((Observable) publisher).blockingIterable())
} else if (publisher instanceof Completable) {
return ((Completable) publisher).toMaybe().blockingGet()
}
throw new IllegalStateException("Unknown publisher: " + publisher)
}
}
def runUnderTraceWithoutExceptionCatch(String spanName, Closure c) {
Span span = openTelemetry.getTracer("test")
.spanBuilder(spanName)
.startSpan()
try {
return span.makeCurrent().withCloseable {
c.call()
}
} finally {
span.end()
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava.v3.common;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Test;
public abstract class AbstractRxJava3SubscriptionTest {
protected abstract InstrumentationExtension testing();
@Test
public void subscriptionTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
testing()
.runWithSpan(
"parent",
() -> {
Single<Connection> connectionSingle =
Single.create(emitter -> emitter.onSuccess(new Connection()));
Disposable unused =
connectionSingle.subscribe(
connection -> {
connection.query();
countDownLatch.countDown();
});
});
countDownLatch.await();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("Connection.query")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
static class Connection {
int query() {
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan();
span.end();
return new Random().nextInt();
}
}
}

View File

@ -0,0 +1,856 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava.v3.common;
import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.common.primitives.Ints;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable;
import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier;
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish;
import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractRxJava3Test {
private static final String EXCEPTION_MESSAGE = "test exception";
private static final String PARENT = "publisher-parent";
private static final String ADD_ONE = "addOne";
private static final String ADD_TWO = "addTwo";
protected abstract InstrumentationExtension testing();
private static Stream<Arguments> schedulers() {
return Stream.of(
Arguments.of(Schedulers.newThread()),
Arguments.of(Schedulers.computation()),
Arguments.of(Schedulers.single()),
Arguments.of(Schedulers.trampoline()));
}
private int addOne(int i) {
return testing().runWithSpan(ADD_ONE, () -> i + 1);
}
private int addTwo(int i) {
return testing().runWithSpan(ADD_TWO, () -> i + 2);
}
private <T> T createParentSpan(ThrowingSupplier<T, RuntimeException> test) {
return testing().runWithSpan(PARENT, test);
}
private void createParentSpan(ThrowingRunnable<RuntimeException> test) {
testing().runWithSpan(PARENT, test);
}
private enum CancellingSubscriber implements Subscriber<Object> {
INSTANCE;
@Override
public void onSubscribe(Subscription subscription) {
subscription.cancel();
}
@Override
public void onNext(Object o) {}
@Override
public void onError(Throwable throwable) {}
@Override
public void onComplete() {}
}
@Test
public void basicMaybe() {
int result = createParentSpan(() -> Maybe.just(1).map(this::addOne).blockingGet());
assertThat(result).isEqualTo(2);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void twoOperationsMaybe() {
int result =
createParentSpan(() -> Maybe.just(2).map(this::addOne).map(this::addOne).blockingGet());
assertThat(result).isEqualTo(4);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void delayedMaybe() {
int result =
createParentSpan(
() -> Maybe.just(3).delay(100, TimeUnit.MILLISECONDS).map(this::addOne).blockingGet());
assertThat(result).isEqualTo(4);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void delayedTwiceMaybe() {
int result =
createParentSpan(
() ->
Maybe.just(4)
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.blockingGet());
assertThat(result).isEqualTo(6);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void basicFlowable() {
Iterable<Integer> result =
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(5, 6)).map(this::addOne).toList().blockingGet());
assertThat(result).contains(6, 7);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void twoOperationsFlowable() {
List<Integer> result =
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(6, 7))
.map(this::addOne)
.map(this::addOne)
.toList()
.blockingGet());
assertThat(result).contains(8, 9);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void delayedFlowable() {
List<Integer> result =
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(7, 8))
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.toList()
.blockingGet());
assertThat(result).contains(8, 9);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void delayedTwiceFlowable() {
List<Integer> result =
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(8, 9))
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.toList()
.blockingGet());
assertThat(result).contains(10, 11);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void maybeFromCallable() {
Integer result =
createParentSpan(
() -> Maybe.fromCallable(() -> addOne(10)).map(this::addOne).blockingGet());
assertThat(result).isEqualTo(12);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void basicSingle() {
Integer result = createParentSpan(() -> Single.just(0).map(this::addOne).blockingGet());
assertThat(result).isEqualTo(1);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void basicObservable() {
List<Integer> result =
createParentSpan(() -> Observable.just(0).map(this::addOne).toList().blockingGet());
assertThat(result).contains(1);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void connectableFlowable() {
List<Integer> result =
createParentSpan(
() ->
FlowablePublish.just(0)
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.toList()
.blockingGet());
assertThat(result).contains(1);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void connectableObservable() {
List<Integer> result =
createParentSpan(
() ->
ObservablePublish.just(0)
.delay(100, TimeUnit.MILLISECONDS)
.map(this::addOne)
.toList()
.blockingGet());
assertThat(result).contains(1);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void maybeError() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(() -> createParentSpan(() -> Maybe.error(error).blockingGet()))
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void flowableError() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(() -> createParentSpan(() -> Flowable.error(error)).toList().blockingGet())
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void singleError() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(() -> createParentSpan(() -> Single.error(error)).blockingGet())
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void observableError() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(() -> createParentSpan(() -> Observable.error(error).toList().blockingGet()))
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void completableError() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(
() -> createParentSpan(() -> Completable.error(error).toMaybe().blockingGet()))
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void basicMaybeFailure() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(
() ->
createParentSpan(
() ->
Maybe.just(1)
.map(this::addOne)
.map(
i -> {
throw error;
})
.blockingGet()))
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void basicFlowableFailure() {
IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE);
assertThatThrownBy(
() ->
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(5, 6))
.map(this::addOne)
.map(
i -> {
throw error;
})
.toList()
.blockingGet()))
.isEqualTo(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void basicMaybeCancel() {
createParentSpan(
() ->
Maybe.just(1).toFlowable().map(this::addOne).subscribe(CancellingSubscriber.INSTANCE));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void basicFlowableCancel() {
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(5, 6))
.map(this::addOne)
.subscribe(CancellingSubscriber.INSTANCE));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void basicSingleCancel() {
createParentSpan(
() ->
Single.just(1).toFlowable().map(this::addOne).subscribe(CancellingSubscriber.INSTANCE));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void basicCompletableCancel() {
createParentSpan(
() ->
Completable.fromCallable(() -> 1)
.toFlowable()
.subscribe(CancellingSubscriber.INSTANCE));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void basicObservableCancel() {
createParentSpan(
() ->
Observable.just(1)
.toFlowable(BackpressureStrategy.LATEST)
.map(this::addOne)
.subscribe(CancellingSubscriber.INSTANCE));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent()));
}
@Test
public void basicMaybeChain() {
createParentSpan(
() ->
Maybe.just(1)
.map(this::addOne)
.map(this::addOne)
.concatWith(Maybe.just(1).map(this::addOne))
.toList()
.blockingGet());
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void basicFlowableChain() {
createParentSpan(
() ->
Flowable.fromIterable(Ints.asList(5, 6))
.map(this::addOne)
.map(this::addOne)
.concatWith(Maybe.just(1).map(this::addOne))
.toList()
.blockingGet());
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
// Publisher chain spans have the correct parents from subscription time
@Test
public void maybeChainParentSpan() {
Maybe<Integer> maybe = Maybe.just(42).map(this::addOne).map(this::addTwo);
testing().runWithSpan("trace-parent", () -> maybe.blockingGet());
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("trace-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_TWO)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void maybeChainHasSubscriptionContext() {
Integer result =
createParentSpan(
() -> {
Maybe<Integer> maybe = Maybe.just(1).map(this::addOne);
return testing()
.runWithSpan("intermediate", () -> maybe.map(this::addTwo))
.blockingGet();
});
assertThat(result).isEqualTo(4);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("intermediate")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_TWO)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void flowableChainHasSubscriptionContext() {
List<Integer> result =
createParentSpan(
() -> {
Flowable<Integer> flowable =
Flowable.fromIterable(Ints.asList(1, 2)).map(this::addOne);
return testing()
.runWithSpan("intermediate", () -> flowable.map(this::addTwo))
.toList()
.blockingGet();
});
assertThat(result).contains(4, 5);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("intermediate")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_TWO)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_TWO)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void singleChainHasSubscriptionContext() {
Integer result =
createParentSpan(
() -> {
Single<Integer> single = Single.just(1).map(this::addOne);
return testing()
.runWithSpan("intermediate", () -> single.map(this::addTwo))
.blockingGet();
});
assertThat(result).isEqualTo(4);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("intermediate")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_TWO)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
public void observableChainHasSubscriptionContext() {
List<Integer> result =
createParentSpan(
() -> {
Observable<Integer> observable = Observable.just(1).map(this::addOne);
return testing()
.runWithSpan("intermediate", () -> observable.map(this::addTwo))
.toList()
.blockingGet();
});
assertThat(result).contains(4);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("intermediate")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_TWO)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@ParameterizedTest
@MethodSource("schedulers")
public void flowableMultiResults(Scheduler scheduler) {
List<Integer> result =
testing()
.runWithSpan(
"flowable root",
() -> {
return Flowable.fromIterable(Ints.asList(1, 2, 3, 4))
.parallel()
.runOn(scheduler)
.flatMap(num -> Maybe.just(num).map(this::addOne).toFlowable())
.sequential()
.toList()
.blockingGet();
});
assertThat(result.size()).isEqualTo(4);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("flowable root").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName(ADD_ONE)
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@ParameterizedTest
@MethodSource("schedulers")
public void maybeMultipleTraceChains(Scheduler scheduler) {
int iterations = 100;
RxJava3ConcurrencyTestHelper.launchAndWait(scheduler, iterations, 60000, testing());
@SuppressWarnings("unchecked")
Consumer<TraceAssert>[] assertions = (Consumer<TraceAssert>[]) new Consumer<?>[iterations];
for (int i = 0; i < iterations; i++) {
int iteration = i;
assertions[i] =
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("outer")
.hasNoParent()
.hasAttributes(attributeEntry("iteration", iteration)),
span ->
span.hasName("middle")
.hasParent(trace.getSpan(0))
.hasAttributes(attributeEntry("iteration", iteration)),
span ->
span.hasName("inner")
.hasParent(trace.getSpan(1))
.hasAttributes(attributeEntry("iteration", iteration)));
}
testing()
.waitAndAssertSortedTraces(
Comparator.comparing(
span -> span.get(0).getAttributes().get(AttributeKey.longKey("iteration"))),
assertions);
testing().clearData();
}
}

View File

@ -0,0 +1,992 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rxjava.v3.common;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.CODE_FUNCTION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.CODE_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subjects.CompletableSubject;
import io.reactivex.rxjava3.subjects.MaybeSubject;
import io.reactivex.rxjava3.subjects.SingleSubject;
import io.reactivex.rxjava3.subjects.UnicastSubject;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public abstract class AbstractRxJava3WithSpanTest {
private static final AttributeKey<Boolean> RXJAVA_CANCELED =
AttributeKey.booleanKey("rxjava.canceled");
protected abstract AbstractTracedWithSpan newTraced();
protected abstract InstrumentationExtension testing();
@Test
public void captureSpanForCompletedCompletable() {
TestObserver<Object> observer = new TestObserver<>();
Completable source = Completable.complete();
newTraced().completable(source).subscribe(observer);
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.completable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "completable"))));
}
@Test
public void captureSpanForEventuallyCompletedCompletable() throws InterruptedException {
CompletableSubject source = CompletableSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().completable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onComplete();
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.completable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "completable"))));
}
@Test
public void captureSpanForErrorCompletable() {
IllegalStateException error = new IllegalStateException("Boom");
TestObserver<Object> observer = new TestObserver<>();
Completable source = Completable.error(error);
newTraced().completable(source).subscribe(observer);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.completable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "completable"))));
}
@Test
public void captureSpanForEventuallyErrorCompletable() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
CompletableSubject source = CompletableSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().completable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.completable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "completable"))));
}
@Test
public void captureSpanForCanceledCompletable() throws InterruptedException {
CompletableSubject source = CompletableSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().completable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.dispose();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.completable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "completable"),
equalTo(RXJAVA_CANCELED, true))));
}
@Test
public void captureSpanForCompletedMaybe() {
Maybe<String> source = Maybe.just("Value");
TestObserver<Object> observer = new TestObserver<>();
newTraced().maybe(source).subscribe(observer);
observer.assertValue("Value");
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.maybe")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "maybe"))));
}
@Test
public void captureSpanForEmptyMaybe() {
Maybe<String> source = Maybe.empty();
TestObserver<Object> observer = new TestObserver<>();
newTraced().maybe(source).subscribe(observer);
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.maybe")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "maybe"))));
}
@Test
public void captureSpanForEventuallyCompletedMaybe() throws InterruptedException {
MaybeSubject<String> source = MaybeSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().maybe(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onSuccess("Value");
observer.assertValue("Value");
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.maybe")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "maybe"))));
}
@Test
public void captureSpanForErrorMaybe() {
IllegalStateException error = new IllegalStateException("Boom");
TestObserver<Object> observer = new TestObserver<>();
Maybe<String> source = Maybe.error(error);
newTraced().maybe(source).subscribe(observer);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.maybe")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "maybe"))));
}
@Test
public void captureSpanForEventuallyErrorMaybe() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
MaybeSubject<String> source = MaybeSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().maybe(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.maybe")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "maybe"))));
}
@Test
public void captureSpanForCanceledMaybe() throws InterruptedException {
MaybeSubject<String> source = MaybeSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().maybe(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.dispose();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.maybe")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "maybe"),
equalTo(RXJAVA_CANCELED, true))));
}
@Test
public void captureSpanForCompletedSingle() {
Single<String> source = Single.just("Value");
TestObserver<Object> observer = new TestObserver<>();
newTraced().single(source).subscribe(observer);
observer.assertValue("Value");
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.single")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "single"))));
}
@Test
public void captureSpanForEventuallyCompletedSingle() throws InterruptedException {
SingleSubject<String> source = SingleSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().single(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onSuccess("Value");
observer.assertValue("Value");
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.single")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "single"))));
}
@Test
public void captureSpanForErrorSingle() {
IllegalStateException error = new IllegalStateException("Boom");
TestObserver<Object> observer = new TestObserver<>();
Single<String> source = Single.error(error);
newTraced().single(source).subscribe(observer);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.single")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "single"))));
}
@Test
public void captureSpanForEventuallyErrorSingle() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
SingleSubject<String> source = SingleSubject.create();
TestObserver<String> observer = new TestObserver<>();
newTraced().single(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.single")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "single"))));
}
@Test
public void captureSpanForCanceledSingle() throws InterruptedException {
SingleSubject<String> source = SingleSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().single(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.dispose();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.single")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "single"),
equalTo(RXJAVA_CANCELED, true))));
}
@Test
public void captureSpanForCompletedObservable() {
TestObserver<Object> observer = new TestObserver<>();
Observable<String> source = Observable.just("Value");
newTraced().observable(source).subscribe(observer);
observer.assertValue("Value");
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.observable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "observable"))));
}
@Test
public void captureSpanForEventuallyCompletedObservable() throws InterruptedException {
TestObserver<Object> observer = new TestObserver<>();
UnicastSubject<String> source = UnicastSubject.create();
newTraced().observable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onComplete();
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.observable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "observable"))));
}
@Test
public void captureSpanForErrorObservable() {
IllegalStateException error = new IllegalStateException("Boom");
Observable<String> source = Observable.error(error);
TestObserver<Object> observer = new TestObserver<>();
newTraced().observable(source).subscribe(observer);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.observable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "observable"))));
}
@Test
public void captureSpanForEventuallyErrorObservable() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
UnicastSubject<String> source = UnicastSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().observable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.observable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "observable"))));
}
@Test
public void captureSpanForCanceledObservable() throws InterruptedException {
UnicastSubject<String> source = UnicastSubject.create();
TestObserver<Object> observer = new TestObserver<>();
newTraced().observable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.dispose();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.observable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "observable"),
equalTo(RXJAVA_CANCELED, true))));
}
@Test
public void captureSpanForCompletedFlowable() {
TestSubscriber<Object> observe = new TestSubscriber<>();
Flowable<String> source = Flowable.just("Value");
newTraced().flowable(source).subscribe(observe);
observe.assertValue("Value");
observe.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.flowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "flowable"))));
}
@Test
public void captureForEventuallyCompletedFlowable() throws InterruptedException {
UnicastProcessor<String> source = UnicastProcessor.create();
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().flowable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onComplete();
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.flowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "flowable"))));
}
@Test
public void captureSpanForErrorFlowable() {
IllegalStateException error = new IllegalStateException("Boom");
TestSubscriber<Object> observer = new TestSubscriber<>();
Flowable<String> source = Flowable.error(error);
newTraced().flowable(source).subscribe(observer);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.flowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "flowable"))));
}
@Test
public void captureSpanForEventuallyErrorFlowable() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
UnicastProcessor<String> source = UnicastProcessor.create();
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().flowable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.flowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "flowable"))));
}
@Test
public void captureSpanForCanceledFlowable() throws InterruptedException {
UnicastProcessor<String> source = UnicastProcessor.create();
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().flowable(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.cancel();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.flowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "flowable"),
equalTo(RXJAVA_CANCELED, true))));
}
@Test
public void captureSpanForCompletedParallelFlowable() {
Flowable<String> source = Flowable.just("Value");
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().parallelFlowable(source.parallel()).sequential().subscribe(observer);
observer.assertValue("Value");
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.parallelFlowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "parallelFlowable"))));
}
@Test
public void captureSpanForEventuallyCompletedParallelFlowable() throws InterruptedException {
UnicastProcessor<String> source = UnicastProcessor.create();
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().parallelFlowable(source.parallel()).sequential().subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onComplete();
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.parallelFlowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "parallelFlowable"))));
}
@Test
public void captureSpanForErrorParallelFlowable() {
IllegalStateException error = new IllegalStateException("Boom");
TestSubscriber<Object> observer = new TestSubscriber<>();
Flowable<String> source = Flowable.error(error);
newTraced().parallelFlowable(source.parallel()).sequential().subscribe(observer);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.parallelFlowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "parallelFlowable"))));
}
@Test
public void captureSpanForEventuallyErrorParallelFlowable() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
TestSubscriber<Object> observer = new TestSubscriber<>();
UnicastProcessor<String> source = UnicastProcessor.create();
newTraced().parallelFlowable(source.parallel()).sequential().subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.parallelFlowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "parallelFlowable"))));
}
@Test
public void captureSpanForCanceledParallelFlowable() throws InterruptedException {
TestSubscriber<Object> observer = new TestSubscriber<>();
UnicastProcessor<String> source = UnicastProcessor.create();
newTraced().parallelFlowable(source.parallel()).sequential().subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onNext("Value");
observer.assertValue("Value");
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.cancel();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.parallelFlowable")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "parallelFlowable"),
equalTo(RXJAVA_CANCELED, true))));
}
@Test
public void captureSpanForEventuallyCompletedPublisher() throws InterruptedException {
CustomPublisher source = new CustomPublisher();
TestSubscriber<String> observer = new TestSubscriber<>();
newTraced().publisher(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onComplete();
observer.assertComplete();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.publisher")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "publisher"))));
}
@Test
public void captureSpanForEventuallyErrorPublisher() throws InterruptedException {
IllegalStateException error = new IllegalStateException("Boom");
CustomPublisher source = new CustomPublisher();
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().publisher(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
source.onError(error);
observer.assertError(error);
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.publisher")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(error)
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "publisher"))));
}
@Test
public void captureSpanForCanceledPublisher() throws InterruptedException {
CustomPublisher source = new CustomPublisher();
TestSubscriber<Object> observer = new TestSubscriber<>();
newTraced().publisher(source).subscribe(observer);
// sleep a bit just to make sure no span is captured
Thread.sleep(500);
List<List<SpanData>> traces = testing().waitForTraces(0);
assertThat(traces).isEmpty();
observer.cancel();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("TracedWithSpan.publisher")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributesSatisfyingExactly(
satisfies(CODE_NAMESPACE, val -> val.endsWith(".TracedWithSpan")),
equalTo(CODE_FUNCTION, "publisher"),
equalTo(RXJAVA_CANCELED, true))));
}
static class CustomPublisher implements Publisher<String>, Subscription {
Subscriber<? super String> subscriber;
@Override
public void subscribe(Subscriber<? super String> subscriber) {
this.subscriber = subscriber;
subscriber.onSubscribe(this);
}
void onComplete() {
this.subscriber.onComplete();
}
void onError(Throwable exception) {
this.subscriber.onError(exception);
}
@Override
public void request(long l) {}
@Override
public void cancel() {}
}
}

View File

@ -6,7 +6,7 @@
package io.opentelemetry.instrumentation.rxjava.v3.common; package io.opentelemetry.instrumentation.rxjava.v3.common;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.testing.InstrumentationTestRunner; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.core.Single;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -23,8 +23,10 @@ import java.util.concurrent.TimeUnit;
* from different traces. * from different traces.
*/ */
public class RxJava3ConcurrencyTestHelper { public class RxJava3ConcurrencyTestHelper {
private RxJava3ConcurrencyTestHelper() {}
public static void launchAndWait( public static void launchAndWait(
Scheduler scheduler, int iterations, long timeoutMillis, InstrumentationTestRunner runner) { Scheduler scheduler, int iterations, long timeoutMillis, InstrumentationExtension runner) {
CountDownLatch latch = new CountDownLatch(iterations); CountDownLatch latch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
@ -40,7 +42,7 @@ public class RxJava3ConcurrencyTestHelper {
} }
} }
private static void launchOuter(Iteration iteration, InstrumentationTestRunner runner) { private static void launchOuter(Iteration iteration, InstrumentationExtension runner) {
runner.runWithSpan( runner.runWithSpan(
"outer", "outer",
() -> { () -> {
@ -58,7 +60,7 @@ public class RxJava3ConcurrencyTestHelper {
}); });
} }
private static void launchInner(Iteration iteration, InstrumentationTestRunner runner) { private static void launchInner(Iteration iteration, InstrumentationExtension runner) {
runner.runWithSpan( runner.runWithSpan(
"middle", "middle",
() -> { () -> {

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan
import io.opentelemetry.instrumentation.rxjava.v3.common.extensionannotation.TracedWithSpan
class RxJava3ExtensionWithSpanTest extends AbstractRxJava3WithSpanTest {
@Override
AbstractTracedWithSpan newTraced() {
return new TracedWithSpan()
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan
import io.opentelemetry.instrumentation.rxjava.v3.common.instrumentationannotation.TracedWithSpan
class RxJava3InstrumentationWithSpanTest extends AbstractRxJava3WithSpanTest {
@Override
AbstractTracedWithSpan newTraced() {
return new TracedWithSpan()
}
}

View File

@ -1,10 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements AgentTestTrait {
}

View File

@ -1,10 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test
import io.opentelemetry.instrumentation.test.AgentTestTrait
class RxJava3Test extends AbstractRxJava3Test implements AgentTestTrait {
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest;
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan;
import io.opentelemetry.instrumentation.rxjava.v3.common.extensionannotation.TracedWithSpan;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3ExtensionWithSpanTest extends AbstractRxJava3WithSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected AbstractTracedWithSpan newTraced() {
return new TracedWithSpan();
}
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest;
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan;
import io.opentelemetry.instrumentation.rxjava.v3.common.extensionannotation.TracedWithSpan;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3InstrumentationWithSpanTest extends AbstractRxJava3WithSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected AbstractTracedWithSpan newTraced() {
return new TracedWithSpan();
}
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3Test extends AbstractRxJava3Test {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest
import io.opentelemetry.instrumentation.rxjava.v3_0.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared
class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait {
@Shared
TracingAssembly tracingAssembly = TracingAssembly.create()
def setupSpec() {
tracingAssembly.enable()
}
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test
import io.opentelemetry.instrumentation.rxjava.v3_0.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared
class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait {
@Shared
TracingAssembly tracingAssembly = TracingAssembly.create()
def setupSpec() {
tracingAssembly.enable()
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest;
import io.opentelemetry.instrumentation.rxjava.v3_0.TracingAssembly;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
static TracingAssembly tracingAssembly = TracingAssembly.create();
@BeforeAll
public static void setupSpec() {
tracingAssembly.enable();
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test;
import io.opentelemetry.instrumentation.rxjava.v3_0.TracingAssembly;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3Test extends AbstractRxJava3Test {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
static TracingAssembly tracingAssembly = TracingAssembly.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
@BeforeAll
public void setupSpec() {
tracingAssembly.enable();
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan
import io.opentelemetry.instrumentation.rxjava.v3.common.extensionannotation.TracedWithSpan
class RxJava3ExtensionWithSpanTest extends AbstractRxJava3WithSpanTest {
@Override
AbstractTracedWithSpan newTraced() {
return new TracedWithSpan()
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan
import io.opentelemetry.instrumentation.rxjava.v3.common.instrumentationannotation.TracedWithSpan
class RxJava3InstrumentationWithSpanTest extends AbstractRxJava3WithSpanTest {
@Override
AbstractTracedWithSpan newTraced() {
return new TracedWithSpan()
}
}

View File

@ -1,10 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements AgentTestTrait {
}

View File

@ -1,10 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test
import io.opentelemetry.instrumentation.test.AgentTestTrait
class RxJava3Test extends AbstractRxJava3Test implements AgentTestTrait {
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest;
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan;
import io.opentelemetry.instrumentation.rxjava.v3.common.extensionannotation.TracedWithSpan;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3ExtensionWithSpanTest extends AbstractRxJava3WithSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected AbstractTracedWithSpan newTraced() {
return new TracedWithSpan();
}
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3WithSpanTest;
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractTracedWithSpan;
import io.opentelemetry.instrumentation.rxjava.v3.common.extensionannotation.TracedWithSpan;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3InstrumentationWithSpanTest extends AbstractRxJava3WithSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected AbstractTracedWithSpan newTraced() {
return new TracedWithSpan();
}
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3Test extends AbstractRxJava3Test {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest
import io.opentelemetry.instrumentation.rxjava.v3_1_1.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared
class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait {
@Shared
TracingAssembly tracingAssembly = TracingAssembly.create()
def setupSpec() {
tracingAssembly.enable()
}
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test
import io.opentelemetry.instrumentation.rxjava.v3_1_1.TracingAssembly
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared
class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait {
@Shared
TracingAssembly tracingAssembly = TracingAssembly.create()
def setupSpec() {
tracingAssembly.enable()
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3SubscriptionTest;
import io.opentelemetry.instrumentation.rxjava.v3_1_1.TracingAssembly;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
static TracingAssembly tracingAssembly = TracingAssembly.create();
@BeforeAll
public static void setupSpec() {
tracingAssembly.enable();
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.rxjava.v3.common.AbstractRxJava3Test;
import io.opentelemetry.instrumentation.rxjava.v3_1_1.TracingAssembly;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RxJava3Test extends AbstractRxJava3Test {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
static TracingAssembly tracingAssembly = TracingAssembly.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
@BeforeAll
public void setupSpec() {
tracingAssembly.enable();
}
}