Fix WithSpan initialization ordering requirement for SpanAttribute and Flux (#5764)
* Add shared tests for async end strategy WithSpan * WIP * WIP * Finally * dump * Oops * namedOneOf
This commit is contained in:
parent
7e428168ff
commit
496c6cfb0a
|
@ -22,5 +22,6 @@ dependencies {
|
|||
|
||||
implementation(project(":instrumentation:guava-10.0:library"))
|
||||
|
||||
testImplementation(project(":instrumentation:opentelemetry-annotations-1.0:testing"))
|
||||
testImplementation("io.opentelemetry:opentelemetry-extension-annotations")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.guava;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.opentelemetry.extension.annotations.WithSpan;
|
||||
import io.opentelemetry.javaagent.instrumentation.otelannotations.AbstractTraced;
|
||||
import io.opentelemetry.javaagent.instrumentation.otelannotations.AbstractWithSpanTest;
|
||||
import org.testcontainers.shaded.com.google.common.base.Throwables;
|
||||
|
||||
class GuavaWithSpanTest
|
||||
extends AbstractWithSpanTest<SettableFuture<String>, ListenableFuture<String>> {
|
||||
|
||||
@Override
|
||||
protected AbstractTraced<SettableFuture<String>, ListenableFuture<String>> newTraced() {
|
||||
return new Traced();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void complete(SettableFuture<String> future, String value) {
|
||||
future.set(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void fail(SettableFuture<String> future, Throwable error) {
|
||||
future.setException(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cancel(SettableFuture<String> future) {
|
||||
future.cancel(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCompleted(ListenableFuture<String> future) {
|
||||
return Futures.getUnchecked(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Throwable unwrapError(Throwable t) {
|
||||
return Throwables.getRootCause(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String canceledKey() {
|
||||
return "guava.canceled";
|
||||
}
|
||||
|
||||
static final class Traced
|
||||
extends AbstractTraced<SettableFuture<String>, ListenableFuture<String>> {
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected SettableFuture<String> completable() {
|
||||
return SettableFuture.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected ListenableFuture<String> alreadySucceeded() {
|
||||
return Futures.immediateFuture("Value");
|
||||
}
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected ListenableFuture<String> alreadyFailed() {
|
||||
return Futures.immediateFailedFuture(FAILURE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.guava;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.opentelemetry.extension.annotations.WithSpan;
|
||||
|
||||
final class TracedWithSpan {
|
||||
static final IllegalArgumentException FAILURE = new IllegalArgumentException("Boom");
|
||||
|
||||
@WithSpan
|
||||
SettableFuture<String> completable() {
|
||||
return SettableFuture.create();
|
||||
}
|
||||
|
||||
@WithSpan
|
||||
ListenableFuture<String> alreadySucceeded() {
|
||||
return Futures.immediateFuture("Value");
|
||||
}
|
||||
|
||||
@WithSpan
|
||||
ListenableFuture<String> alreadyFailed() {
|
||||
return Futures.immediateFailedFuture(FAILURE);
|
||||
}
|
||||
}
|
|
@ -119,7 +119,6 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
@Advice.Local("otelMethod") Method method,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
|
||||
// Every usage of @Advice.Origin Method is replaced with a call to Class.getMethod, copy it
|
||||
// to local variable so that there would be only one call to Class.getMethod.
|
||||
method = originMethod;
|
||||
|
@ -159,8 +158,6 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
@Advice.Origin Method originMethod,
|
||||
@Advice.Local("otelMethod") Method method,
|
||||
@Advice.AllArguments(typing = Assigner.Typing.DYNAMIC) Object[] args,
|
||||
@Advice.Local("otelOperationEndSupport")
|
||||
AsyncOperationEndSupport<MethodRequest, Object> operationEndSupport,
|
||||
@Advice.Local("otelRequest") MethodRequest request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
|
@ -176,16 +173,12 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
if (instrumenter.shouldStart(current, request)) {
|
||||
context = instrumenter.start(current, request);
|
||||
scope = context.makeCurrent();
|
||||
operationEndSupport =
|
||||
AsyncOperationEndSupport.create(instrumenter, Object.class, method.getReturnType());
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Local("otelMethod") Method method,
|
||||
@Advice.Local("otelOperationEndSupport")
|
||||
AsyncOperationEndSupport<MethodRequest, Object> operationEndSupport,
|
||||
@Advice.Local("otelRequest") MethodRequest request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope,
|
||||
|
@ -195,6 +188,9 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
return;
|
||||
}
|
||||
scope.close();
|
||||
AsyncOperationEndSupport<MethodRequest, Object> operationEndSupport =
|
||||
AsyncOperationEndSupport.create(
|
||||
instrumenterWithAttributes(), Object.class, method.getReturnType());
|
||||
returnValue = operationEndSupport.asyncEnd(context, request, returnValue, throwable);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
plugins {
|
||||
id("otel.java-conventions")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api(project(":testing-common"))
|
||||
|
||||
implementation("io.opentelemetry:opentelemetry-extension-annotations")
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
public abstract class AbstractTraced<T extends U, U> {
|
||||
|
||||
protected static final String SUCCESS_VALUE = "Value";
|
||||
|
||||
protected static final IllegalArgumentException FAILURE = new IllegalArgumentException("Boom");
|
||||
|
||||
protected AbstractTraced() {
|
||||
if (!getClass().getSimpleName().equals("Traced")) {
|
||||
throw new IllegalStateException("Subclasses of AbstractTraced must be named Traced");
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract T completable();
|
||||
|
||||
protected abstract U alreadySucceeded();
|
||||
|
||||
protected abstract U alreadyFailed();
|
||||
}
|
|
@ -3,15 +3,12 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.guava;
|
||||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.assertj.core.api.Assertions.catchThrowable;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.common.util.concurrent.UncheckedExecutionException;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
|
@ -20,21 +17,41 @@ import io.opentelemetry.sdk.trace.data.StatusData;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
class GuavaWithSpanInstrumentationTest {
|
||||
public abstract class AbstractWithSpanTest<T extends U, U> {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
protected abstract AbstractTraced<T, U> newTraced();
|
||||
|
||||
protected abstract void complete(T future, String value);
|
||||
|
||||
protected abstract void fail(T future, Throwable error);
|
||||
|
||||
protected abstract void cancel(T future);
|
||||
|
||||
protected abstract String getCompleted(U future);
|
||||
|
||||
protected abstract Throwable unwrapError(Throwable t);
|
||||
|
||||
protected abstract String canceledKey();
|
||||
|
||||
protected final InstrumentationExtension testing() {
|
||||
return testing;
|
||||
}
|
||||
|
||||
@Test
|
||||
void success() {
|
||||
SettableFuture<String> future = new TracedWithSpan().completable();
|
||||
future.set("Value");
|
||||
T future = newTraced().completable();
|
||||
complete(future, AbstractTraced.SUCCESS_VALUE);
|
||||
|
||||
assertThat(getCompleted(future)).isEqualTo(AbstractTraced.SUCCESS_VALUE);
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.completable")
|
||||
span.hasName("Traced.completable")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty())));
|
||||
|
@ -42,46 +59,49 @@ class GuavaWithSpanInstrumentationTest {
|
|||
|
||||
@Test
|
||||
void failure() {
|
||||
IllegalArgumentException error = new IllegalArgumentException("Boom");
|
||||
SettableFuture<String> future = new TracedWithSpan().completable();
|
||||
future.setException(error);
|
||||
T future = newTraced().completable();
|
||||
fail(future, AbstractTraced.FAILURE);
|
||||
|
||||
Throwable thrown = catchThrowable(() -> getCompleted(future));
|
||||
assertThat(unwrapError(thrown)).isEqualTo(AbstractTraced.FAILURE);
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.completable")
|
||||
span.hasName("Traced.completable")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasStatus(StatusData.error())
|
||||
.hasException(error)
|
||||
.hasException(AbstractTraced.FAILURE)
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void canceled() {
|
||||
SettableFuture<String> future = new TracedWithSpan().completable();
|
||||
future.cancel(true);
|
||||
T future = newTraced().completable();
|
||||
cancel(future);
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.completable")
|
||||
span.hasName("Traced.completable")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(attributeEntry("guava.canceled", true))));
|
||||
.hasAttributes(attributeEntry(canceledKey(), true))));
|
||||
}
|
||||
|
||||
@Test
|
||||
void immediateSuccess() {
|
||||
assertThat(Futures.getUnchecked(new TracedWithSpan().alreadySucceeded())).isEqualTo("Value");
|
||||
assertThat(getCompleted(newTraced().alreadySucceeded()))
|
||||
.isEqualTo(AbstractTraced.SUCCESS_VALUE);
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.alreadySucceeded")
|
||||
span.hasName("Traced.alreadySucceeded")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty())));
|
||||
|
@ -89,19 +109,18 @@ class GuavaWithSpanInstrumentationTest {
|
|||
|
||||
@Test
|
||||
void immediateFailure() {
|
||||
assertThatThrownBy(() -> Futures.getUnchecked(new TracedWithSpan().alreadyFailed()))
|
||||
.isInstanceOf(UncheckedExecutionException.class)
|
||||
.hasCause(TracedWithSpan.FAILURE);
|
||||
Throwable error = catchThrowable(() -> getCompleted(newTraced().alreadyFailed()));
|
||||
assertThat(unwrapError(error)).isEqualTo(AbstractTraced.FAILURE);
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.alreadyFailed")
|
||||
span.hasName("Traced.alreadyFailed")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasStatus(StatusData.error())
|
||||
.hasException(TracedWithSpan.FAILURE)
|
||||
.hasException(AbstractTraced.FAILURE)
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ dependencies {
|
|||
compileOnly(project(path = ":opentelemetry-api-shaded-for-instrumenting", configuration = "shadow"))
|
||||
|
||||
testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")
|
||||
testImplementation(project(":instrumentation:opentelemetry-annotations-1.0:testing"))
|
||||
testImplementation(project(":instrumentation:reactor:reactor-3.1:testing"))
|
||||
testImplementation("io.opentelemetry:opentelemetry-extension-annotations")
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.reactor;
|
|||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
|
||||
|
@ -19,7 +20,11 @@ import net.bytebuddy.matcher.ElementMatcher;
|
|||
public class HooksInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("reactor.core.publisher.Hooks");
|
||||
return namedOneOf(
|
||||
"reactor.core.publisher.Hooks",
|
||||
// Hooks may not be loaded early enough so also match our main targets
|
||||
"reactor.core.publisher.Flux",
|
||||
"reactor.core.publisher.Mono");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor;
|
||||
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.extension.annotations.WithSpan;
|
||||
import io.opentelemetry.javaagent.instrumentation.otelannotations.AbstractTraced;
|
||||
import io.opentelemetry.javaagent.instrumentation.otelannotations.AbstractWithSpanTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.Scannable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.UnicastProcessor;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
class FluxWithSpanTest extends AbstractWithSpanTest<Flux<String>, Flux<String>> {
|
||||
|
||||
@Override
|
||||
protected AbstractTraced<Flux<String>, Flux<String>> newTraced() {
|
||||
return new Traced();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void complete(Flux<String> future, String value) {
|
||||
UnicastProcessor<String> source = processor(future);
|
||||
source.onNext(value);
|
||||
source.onComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void fail(Flux<String> future, Throwable error) {
|
||||
UnicastProcessor<String> source = processor(future);
|
||||
source.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cancel(Flux<String> future) {
|
||||
StepVerifier.create(future).expectSubscription().thenCancel().verify();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCompleted(Flux<String> future) {
|
||||
return future.blockLast();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Throwable unwrapError(Throwable t) {
|
||||
return t;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String canceledKey() {
|
||||
return "reactor.canceled";
|
||||
}
|
||||
|
||||
@Test
|
||||
void nested() {
|
||||
Flux<String> flux =
|
||||
Flux.defer(
|
||||
() -> {
|
||||
testing().runWithSpan("inner-manual", () -> {});
|
||||
return Flux.just("Value");
|
||||
});
|
||||
|
||||
Flux<String> result = new TracedWithSpan().flux(flux);
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nestedFromCurrent() {
|
||||
testing()
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() -> {
|
||||
Flux<String> result =
|
||||
new TracedWithSpan()
|
||||
.flux(
|
||||
Flux.defer(
|
||||
() -> {
|
||||
testing().runWithSpan("inner-manual", () -> {});
|
||||
return Flux.just("Value");
|
||||
}));
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
});
|
||||
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("parent")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
// While a UnicastProcessor is a Flux and we'd expect a simpler way to access it to provide
|
||||
// values,
|
||||
// our instrumentation adds operations and causes the return type to always be just a Flux. We
|
||||
// need
|
||||
// to go through the parents to get back to the processor.
|
||||
@SuppressWarnings("unchecked")
|
||||
private static UnicastProcessor<String> processor(Flux<String> flux) {
|
||||
return ((Scannable) flux)
|
||||
.parents()
|
||||
.filter(UnicastProcessor.class::isInstance)
|
||||
.map(UnicastProcessor.class::cast)
|
||||
.findFirst()
|
||||
.get();
|
||||
}
|
||||
|
||||
static class Traced extends AbstractTraced<Flux<String>, Flux<String>> {
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected Flux<String> completable() {
|
||||
return UnicastProcessor.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected Flux<String> alreadySucceeded() {
|
||||
return Flux.just(SUCCESS_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected Flux<String> alreadyFailed() {
|
||||
return Flux.error(FAILURE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor;
|
||||
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.extension.annotations.WithSpan;
|
||||
import io.opentelemetry.javaagent.instrumentation.otelannotations.AbstractTraced;
|
||||
import io.opentelemetry.javaagent.instrumentation.otelannotations.AbstractWithSpanTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.Scannable;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.UnicastProcessor;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
class MonoWithSpanTest extends AbstractWithSpanTest<Mono<String>, Mono<String>> {
|
||||
|
||||
@Override
|
||||
protected AbstractTraced<Mono<String>, Mono<String>> newTraced() {
|
||||
return new Traced();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void complete(Mono<String> future, String value) {
|
||||
UnicastProcessor<String> source = processor(future);
|
||||
source.onNext(value);
|
||||
source.onComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void fail(Mono<String> future, Throwable error) {
|
||||
UnicastProcessor<String> source = processor(future);
|
||||
source.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cancel(Mono<String> future) {
|
||||
StepVerifier.create(future).expectSubscription().thenCancel().verify();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCompleted(Mono<String> future) {
|
||||
return future.block();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Throwable unwrapError(Throwable t) {
|
||||
return t;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String canceledKey() {
|
||||
return "reactor.canceled";
|
||||
}
|
||||
|
||||
@Test
|
||||
void nested() {
|
||||
Mono<String> mono =
|
||||
Mono.defer(
|
||||
() -> {
|
||||
testing().runWithSpan("inner-manual", () -> {});
|
||||
return Mono.just("Value");
|
||||
});
|
||||
|
||||
Mono<String> result = new TracedWithSpan().outer(mono);
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.outer")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nestedFromCurrent() {
|
||||
testing()
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() -> {
|
||||
Mono<String> result =
|
||||
new TracedWithSpan()
|
||||
.mono(
|
||||
Mono.defer(
|
||||
() -> {
|
||||
testing().runWithSpan("inner-manual", () -> {});
|
||||
return Mono.just("Value");
|
||||
}));
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
});
|
||||
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("parent")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
// Because we test on the Mono API but need to be able to complete the processor, we
|
||||
// use this hacky approach to access the processor from the mono ancestor.
|
||||
@SuppressWarnings("unchecked")
|
||||
private static UnicastProcessor<String> processor(Mono<String> mono) {
|
||||
return ((Scannable) mono)
|
||||
.parents()
|
||||
.filter(UnicastProcessor.class::isInstance)
|
||||
.map(UnicastProcessor.class::cast)
|
||||
.findFirst()
|
||||
.get();
|
||||
}
|
||||
|
||||
static class Traced extends AbstractTraced<Mono<String>, Mono<String>> {
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected Mono<String> completable() {
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
return source.singleOrEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected Mono<String> alreadySucceeded() {
|
||||
return Mono.just(SUCCESS_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
@WithSpan
|
||||
protected Mono<String> alreadyFailed() {
|
||||
return Mono.error(FAILURE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,360 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
|
||||
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.UnicastProcessor;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
@SuppressWarnings("ClassCanBeStatic")
|
||||
class ReactorWithSpanInstrumentationTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
@Nested
|
||||
class MonoTest {
|
||||
@Test
|
||||
void success() {
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
Mono<String> mono = source.singleOrEmpty();
|
||||
Mono<String> result = new TracedWithSpan().mono(mono);
|
||||
StepVerifier.Step<String> verifier = StepVerifier.create(result).expectSubscription();
|
||||
|
||||
source.onNext("Value");
|
||||
source.onComplete();
|
||||
verifier.expectNext("Value").verifyComplete();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void failure() {
|
||||
IllegalArgumentException error = new IllegalArgumentException("Boom");
|
||||
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
Mono<String> mono = source.singleOrEmpty();
|
||||
Mono<String> result = new TracedWithSpan().mono(mono);
|
||||
StepVerifier.Step<String> verifier = StepVerifier.create(result).expectSubscription();
|
||||
|
||||
source.onError(error);
|
||||
verifier.verifyErrorMatches(t -> t.equals(error));
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasStatus(StatusData.error())
|
||||
.hasException(error)
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void canceled() {
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
Mono<String> mono = source.singleOrEmpty();
|
||||
Mono<String> result = new TracedWithSpan().mono(mono);
|
||||
StepVerifier.Step<String> verifier = StepVerifier.create(result).expectSubscription();
|
||||
|
||||
verifier.thenCancel().verify();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(attributeEntry("reactor.canceled", true))));
|
||||
}
|
||||
|
||||
@Test
|
||||
void immediateSuccess() {
|
||||
Mono<String> result = new TracedWithSpan().mono(Mono.just("Value"));
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void immediateFailure() {
|
||||
IllegalArgumentException error = new IllegalArgumentException("Boom");
|
||||
Mono<String> result = new TracedWithSpan().mono(Mono.error(error));
|
||||
StepVerifier.create(result).verifyErrorMatches(t -> t.equals(error));
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasStatus(StatusData.error())
|
||||
.hasException(error)
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nested() {
|
||||
Mono<String> mono =
|
||||
Mono.defer(
|
||||
() -> {
|
||||
testing.runWithSpan("inner-manual", () -> {});
|
||||
return Mono.just("Value");
|
||||
});
|
||||
|
||||
Mono<String> result = new TracedWithSpan().outer(mono);
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.outer")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nestedFromCurrent() {
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() -> {
|
||||
Mono<String> result =
|
||||
new TracedWithSpan()
|
||||
.mono(
|
||||
Mono.defer(
|
||||
() -> {
|
||||
testing.runWithSpan("inner-manual", () -> {});
|
||||
return Mono.just("Value");
|
||||
}));
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
});
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("parent")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.mono")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
class FluxTest {
|
||||
@Test
|
||||
void success() {
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
Flux<String> result = new TracedWithSpan().flux(source);
|
||||
StepVerifier.Step<String> verifier = StepVerifier.create(result).expectSubscription();
|
||||
|
||||
source.onNext("Value");
|
||||
source.onComplete();
|
||||
verifier.expectNext("Value").verifyComplete();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void failure() {
|
||||
IllegalArgumentException error = new IllegalArgumentException("Boom");
|
||||
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
Flux<String> result = new TracedWithSpan().flux(source);
|
||||
StepVerifier.Step<String> verifier = StepVerifier.create(result).expectSubscription();
|
||||
|
||||
source.onError(error);
|
||||
verifier.verifyErrorMatches(t -> t.equals(error));
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasStatus(StatusData.error())
|
||||
.hasException(error)
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void canceled() {
|
||||
UnicastProcessor<String> source = UnicastProcessor.create();
|
||||
Flux<String> result = new TracedWithSpan().flux(source);
|
||||
StepVerifier.Step<String> verifier = StepVerifier.create(result).expectSubscription();
|
||||
|
||||
verifier.thenCancel().verify();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(attributeEntry("reactor.canceled", true))));
|
||||
}
|
||||
|
||||
@Test
|
||||
void immediateSuccess() {
|
||||
Flux<String> result = new TracedWithSpan().flux(Flux.just("Value"));
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void immediateFailure() {
|
||||
IllegalArgumentException error = new IllegalArgumentException("Boom");
|
||||
Flux<String> result = new TracedWithSpan().flux(Flux.error(error));
|
||||
StepVerifier.create(result).verifyErrorMatches(t -> t.equals(error));
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasStatus(StatusData.error())
|
||||
.hasException(error)
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nested() {
|
||||
Flux<String> flux =
|
||||
Flux.defer(
|
||||
() -> {
|
||||
testing.runWithSpan("inner-manual", () -> {});
|
||||
return Flux.just("Value");
|
||||
});
|
||||
|
||||
Flux<String> result = new TracedWithSpan().flux(flux);
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nestedFromCurrent() {
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() -> {
|
||||
Flux<String> result =
|
||||
new TracedWithSpan()
|
||||
.flux(
|
||||
Flux.defer(
|
||||
() -> {
|
||||
testing.runWithSpan("inner-manual", () -> {});
|
||||
return Flux.just("Value");
|
||||
}));
|
||||
|
||||
StepVerifier.create(result).expectNext("Value").verifyComplete();
|
||||
});
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("parent")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasNoParent()
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("TracedWithSpan.flux")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty()),
|
||||
span ->
|
||||
span.hasName("inner-manual")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributes(Attributes.empty())));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -316,6 +316,7 @@ include(":instrumentation:okhttp:okhttp-3.0:javaagent")
|
|||
include(":instrumentation:okhttp:okhttp-3.0:library")
|
||||
include(":instrumentation:okhttp:okhttp-3.0:testing")
|
||||
include(":instrumentation:opentelemetry-annotations-1.0:javaagent")
|
||||
include(":instrumentation:opentelemetry-annotations-1.0:testing")
|
||||
include(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:javaagent")
|
||||
include(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:testing")
|
||||
include(":instrumentation:opentelemetry-api:opentelemetry-api-1.4:javaagent")
|
||||
|
|
Loading…
Reference in New Issue