Add asynchronous tracing for Java 8 CompletableFuture in WithSpanAdvice (#2530)

This commit is contained in:
HaloFour 2021-03-22 02:36:05 -04:00 committed by GitHub
parent 1e3d9dd992
commit 4168c0b4fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 454 additions and 1 deletions

View File

@ -13,6 +13,7 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.lang.reflect.Method;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
/**
* Instrumentation for methods annotated with {@link WithSpan} annotation.
@ -40,8 +41,10 @@ public class WithSpanAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Origin Method method,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Return(typing = Assigner.Typing.DYNAMIC) Object returnValue,
@Advice.Thrown Throwable throwable) {
if (scope == null) {
return;
@ -51,7 +54,7 @@ public class WithSpanAdvice {
if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
tracer().end(context);
tracer().end(context, method.getReturnType(), returnValue);
}
}
}

View File

@ -10,6 +10,7 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.javaagent.instrumentation.otelannotations.async.MethodSpanStrategies;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,6 +24,8 @@ public class WithSpanTracer extends BaseTracer {
private static final Logger log = LoggerFactory.getLogger(WithSpanTracer.class);
private final MethodSpanStrategies methodSpanStrategies = MethodSpanStrategies.getInstance();
public Context startSpan(
Context parentContext, WithSpan applicationAnnotation, Method method, SpanKind kind) {
Span span =
@ -69,6 +72,26 @@ public class WithSpanTracer extends BaseTracer {
}
}
/**
* Denotes the end of the invocation of the traced method with a successful result which will end
* the span stored in the passed {@code context}. If the method returned a value representing an
* asynchronous operation then the span will not be finished until the asynchronous operation has
* completed.
*
* @param returnType Return type of the traced method.
* @param returnValue Return value from the traced method.
* @return Either {@code returnValue} or a value composing over {@code returnValue} for
* notification of completion.
* @throws ClassCastException if returnValue is not an instance of returnType
*/
public Object end(Context context, Class<?> returnType, Object returnValue) {
if (!returnType.isInstance(returnValue)) {
end(context);
return returnValue;
}
return methodSpanStrategies.resolveStrategy(returnType).end(this, context, returnValue);
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.opentelemetry-annotations-1.0";

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.otelannotations.async;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
enum Jdk8MethodStrategy implements MethodSpanStrategy {
INSTANCE;
@Override
public boolean supports(Class<?> returnType) {
return returnType == CompletionStage.class || returnType == CompletableFuture.class;
}
@Override
public Object end(BaseTracer tracer, Context context, Object returnValue) {
if (returnValue instanceof CompletableFuture) {
CompletableFuture<?> future = (CompletableFuture<?>) returnValue;
if (endSynchronously(future, tracer, context)) {
return future;
}
return endWhenComplete(future, tracer, context);
}
CompletionStage<?> stage = (CompletionStage<?>) returnValue;
return endWhenComplete(stage, tracer, context);
}
/**
* Checks to see if the {@link CompletableFuture} has already been completed and if so
* synchronously ends the span to avoid additional allocations and overhead registering for
* notification of completion.
*/
private boolean endSynchronously(
CompletableFuture<?> future, BaseTracer tracer, Context context) {
if (!future.isDone()) {
return false;
}
if (future.isCompletedExceptionally()) {
// If the future completed exceptionally then join to catch the exception
// so that it can be recorded to the span
try {
future.join();
} catch (Exception exception) {
tracer.endExceptionally(context, exception);
return true;
}
}
tracer.end(context);
return true;
}
/**
* Registers for notification of the completion of the {@link CompletionStage} at which time the
* span will be ended.
*/
private CompletionStage<?> endWhenComplete(
CompletionStage<?> stage, BaseTracer tracer, Context context) {
return stage.whenComplete(
(result, exception) -> {
if (exception != null) {
tracer.endExceptionally(context, exception);
} else {
tracer.end(context);
}
});
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.otelannotations.async;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Registry of {@link MethodSpanStrategy} implementations for tracing the asynchronous operations
* represented by the return type of a traced method.
*/
public class MethodSpanStrategies {
private static final MethodSpanStrategies instance = new MethodSpanStrategies();
public static MethodSpanStrategies getInstance() {
return instance;
}
private final List<MethodSpanStrategy> strategies = new CopyOnWriteArrayList<>();
private MethodSpanStrategies() {
strategies.add(Jdk8MethodStrategy.INSTANCE);
}
public void registerStrategy(MethodSpanStrategy strategy) {
Objects.requireNonNull(strategy);
strategies.add(strategy);
}
public MethodSpanStrategy resolveStrategy(Class<?> returnType) {
for (MethodSpanStrategy strategy : strategies) {
if (strategy.supports(returnType)) {
return strategy;
}
}
return MethodSpanStrategy.synchronous();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.otelannotations.async;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
/**
* Represents an implementation of a strategy for composing over the return value of a traced
* method. If the return value represents the result of an asynchronous operation the implementation
* can compose or register for notification of completion at which point the span representing the
* invocation of the method will be ended.
*/
public interface MethodSpanStrategy {
boolean supports(Class<?> returnType);
/**
* Denotes the end of the invocation of the traced method with a successful result which will end
* the span stored in the passed {@code context}. If the method returned a value representing an
* asynchronous operation then the span will remain open until the asynchronous operation has
* completed.
*
* @param tracer {@link BaseTracer} tracer to be used to end the span stored in the {@code
* context}.
* @param returnValue Return value from the traced method. Must be an instance of a {@code
* returnType} for which {@link #supports(Class)} returned true (in particular it must not be
* {@code null}).
* @return Either {@code returnValue} or a value composing over {@code returnValue} for
* notification of completion.
*/
Object end(BaseTracer tracer, Context context, Object returnValue);
/**
* Returns a {@link MethodSpanStrategy} for tracing synchronous methods where the return value
* does not represent the completion of an asynchronous operation.
*/
static MethodSpanStrategy synchronous() {
return SynchronousMethodSpanStrategy.INSTANCE;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.otelannotations.async;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
enum SynchronousMethodSpanStrategy implements MethodSpanStrategy {
INSTANCE;
@Override
public boolean supports(Class<?> returnType) {
return true;
}
@Override
public Object end(BaseTracer tracer, Context context, Object returnValue) {
tracer.end(context);
return returnValue;
}
}

View File

@ -0,0 +1,5 @@
/**
* Provides implementations of strategies for tracing methods that return asynchronous and reactive
* values so that the span can be ended when the asynchronous operation completes.
*/
package io.opentelemetry.javaagent.instrumentation.otelannotations.async;

View File

@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import java.util.concurrent.CompletableFuture
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.api.trace.SpanKind.SERVER
@ -144,4 +146,228 @@ class WithSpanInstrumentationTest extends AgentInstrumentationSpecification {
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
}
def "should capture span for already completed CompletionStage"() {
setup:
def future = CompletableFuture.completedFuture("Done")
new TracedWithSpan().completionStage(future)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completionStage"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed CompletionStage"() {
setup:
def future = new CompletableFuture<String>()
new TracedWithSpan().completionStage(future)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
future.complete("Done")
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completionStage"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already exceptionally completed CompletionStage"() {
setup:
def future = new CompletableFuture<String>()
future.completeExceptionally(new IllegalArgumentException("Boom"))
new TracedWithSpan().completionStage(future)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completionStage"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually exceptionally completed CompletionStage"() {
setup:
def future = new CompletableFuture<String>()
new TracedWithSpan().completionStage(future)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
future.completeExceptionally(new IllegalArgumentException("Boom"))
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completionStage"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for null CompletionStage"() {
setup:
new TracedWithSpan().completionStage(null)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completionStage"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already completed CompletableFuture"() {
setup:
def future = CompletableFuture.completedFuture("Done")
new TracedWithSpan().completableFuture(future)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completableFuture"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for eventually completed CompletableFuture"() {
setup:
def future = new CompletableFuture<String>()
new TracedWithSpan().completableFuture(future)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
future.complete("Done")
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completableFuture"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
def "should capture span for already exceptionally completed CompletableFuture"() {
setup:
def future = new CompletableFuture<String>()
future.completeExceptionally(new IllegalArgumentException("Boom"))
new TracedWithSpan().completableFuture(future)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completableFuture"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for eventually exceptionally completed CompletableFuture"() {
setup:
def future = new CompletableFuture<String>()
new TracedWithSpan().completableFuture(future)
expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}
future.completeExceptionally(new IllegalArgumentException("Boom"))
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completableFuture"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
def "should capture span for null CompletableFuture"() {
setup:
new TracedWithSpan().completableFuture(null)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.completableFuture"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}
}

View File

@ -7,6 +7,8 @@ package io.opentelemetry.test.annotation;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.extension.annotations.WithSpan;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class TracedWithSpan {
@ -54,4 +56,14 @@ public class TracedWithSpan {
public String innerClient() {
return "hello!";
}
@WithSpan
public CompletionStage<String> completionStage(CompletableFuture<String> future) {
return future;
}
@WithSpan
public CompletableFuture<String> completableFuture(CompletableFuture<String> future) {
return future;
}
}