Port opentelemetry-annotations-1.0 to Instrumenter API (#3738)
* Port WithSpanInstrumentation to Instrumenter API * Unit tests, clean up attribute binding APIs * Remove AsyncSpanEndStrategies and fix weak reference purging * Move tryToGetResponse to AsyncOperationEndSupport * Address PR comments * ParameterAttributeNamesExtractor can no longer return a null array
This commit is contained in:
parent
21f807553f
commit
a5513a3c60
|
@ -12,6 +12,7 @@ dependencies {
|
|||
// this only exists to make Intellij happy since it doesn't (currently at least) understand our
|
||||
// inclusion of this artifact inside of :instrumentation-api
|
||||
compileOnly(project(":instrumentation-api-caching"))
|
||||
testCompileOnly(project(":instrumentation-api-caching"))
|
||||
|
||||
api("io.opentelemetry:opentelemetry-api")
|
||||
api("io.opentelemetry:opentelemetry-semconv")
|
||||
|
|
|
@ -1,106 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.annotation.support;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.tracer.AttributeSetter;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Parameter;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
/** Base class for instrumentation-specific attribute binding for traced methods. */
|
||||
public abstract class BaseAttributeBinder {
|
||||
|
||||
/**
|
||||
* Creates a binding of the parameters of the traced method to span attributes.
|
||||
*
|
||||
* @param method the traced method
|
||||
* @return the bindings of the parameters
|
||||
*/
|
||||
public AttributeBindings bind(Method method) {
|
||||
AttributeBindings bindings = EmptyAttributeBindings.INSTANCE;
|
||||
|
||||
Parameter[] parameters = method.getParameters();
|
||||
if (parameters == null || parameters.length == 0) {
|
||||
return bindings;
|
||||
}
|
||||
|
||||
String[] attributeNames = attributeNamesForParameters(method, parameters);
|
||||
if (attributeNames == null || attributeNames.length != parameters.length) {
|
||||
return bindings;
|
||||
}
|
||||
|
||||
for (int i = 0; i < parameters.length; i++) {
|
||||
Parameter parameter = parameters[i];
|
||||
String attributeName = attributeNames[i];
|
||||
if (attributeName == null || attributeName.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bindings =
|
||||
new CombinedAttributeBindings(
|
||||
bindings,
|
||||
i,
|
||||
AttributeBindingFactory.createBinding(
|
||||
attributeName, parameter.getParameterizedType()));
|
||||
}
|
||||
|
||||
return bindings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array of the names of the attributes for the parameters of the traced method. The
|
||||
* array should be the same length as the array of the method parameters. An element may be {@code
|
||||
* null} to indicate that the parameter should not be bound to an attribute. The array may also be
|
||||
* {@code null} to indicate that the method has no parameters to bind to attributes.
|
||||
*
|
||||
* @param method the traced method
|
||||
* @param parameters the method parameters
|
||||
* @return an array of the attribute names
|
||||
*/
|
||||
@Nullable
|
||||
protected abstract String[] attributeNamesForParameters(Method method, Parameter[] parameters);
|
||||
|
||||
protected enum EmptyAttributeBindings implements AttributeBindings {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void apply(AttributeSetter setter, Object[] args) {}
|
||||
}
|
||||
|
||||
private static final class CombinedAttributeBindings implements AttributeBindings {
|
||||
private final AttributeBindings parent;
|
||||
private final int index;
|
||||
private final AttributeBinding binding;
|
||||
|
||||
public CombinedAttributeBindings(
|
||||
AttributeBindings parent, int index, AttributeBinding binding) {
|
||||
this.parent = parent;
|
||||
this.index = index;
|
||||
this.binding = binding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void apply(AttributeSetter setter, Object[] args) {
|
||||
parent.apply(setter, args);
|
||||
if (args != null && args.length > index) {
|
||||
Object arg = args[index];
|
||||
if (arg != null) {
|
||||
binding.apply(setter, arg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,6 +8,7 @@ package io.opentelemetry.instrumentation.api.annotation.support;
|
|||
import io.opentelemetry.api.common.AttributesBuilder;
|
||||
import io.opentelemetry.instrumentation.api.caching.Cache;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.tracer.AttributeSetter;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Parameter;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
@ -16,10 +17,10 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
public final class MethodSpanAttributesExtractor<REQUEST, RESPONSE>
|
||||
extends AttributesExtractor<REQUEST, RESPONSE> {
|
||||
|
||||
private final BaseAttributeBinder binder;
|
||||
private final MethodExtractor<REQUEST> methodExtractor;
|
||||
private final MethodArgumentsExtractor<REQUEST> methodArgumentsExtractor;
|
||||
private final Cache<Method, AttributeBindings> cache;
|
||||
private final ParameterAttributeNamesExtractor parameterAttributeNamesExtractor;
|
||||
|
||||
public static <REQUEST, RESPONSE> MethodSpanAttributesExtractor<REQUEST, RESPONSE> newInstance(
|
||||
MethodExtractor<REQUEST> methodExtractor,
|
||||
|
@ -27,23 +28,27 @@ public final class MethodSpanAttributesExtractor<REQUEST, RESPONSE>
|
|||
MethodArgumentsExtractor<REQUEST> methodArgumentsExtractor) {
|
||||
|
||||
return new MethodSpanAttributesExtractor<>(
|
||||
methodExtractor, parameterAttributeNamesExtractor, methodArgumentsExtractor);
|
||||
methodExtractor,
|
||||
parameterAttributeNamesExtractor,
|
||||
methodArgumentsExtractor,
|
||||
new MethodCache<>());
|
||||
}
|
||||
|
||||
MethodSpanAttributesExtractor(
|
||||
MethodExtractor<REQUEST> methodExtractor,
|
||||
ParameterAttributeNamesExtractor parameterAttributeNamesExtractor,
|
||||
MethodArgumentsExtractor<REQUEST> methodArgumentsExtractor) {
|
||||
MethodArgumentsExtractor<REQUEST> methodArgumentsExtractor,
|
||||
Cache<Method, AttributeBindings> cache) {
|
||||
this.methodExtractor = methodExtractor;
|
||||
this.methodArgumentsExtractor = methodArgumentsExtractor;
|
||||
this.binder = new MethodSpanAttributeBinder(parameterAttributeNamesExtractor);
|
||||
this.cache = new MethodCache<>();
|
||||
this.parameterAttributeNamesExtractor = parameterAttributeNamesExtractor;
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onStart(AttributesBuilder attributes, REQUEST request) {
|
||||
Method method = methodExtractor.extract(request);
|
||||
AttributeBindings bindings = cache.computeIfAbsent(method, binder::bind);
|
||||
AttributeBindings bindings = cache.computeIfAbsent(method, this::bind);
|
||||
if (!bindings.isEmpty()) {
|
||||
Object[] args = methodArgumentsExtractor.extract(request);
|
||||
bindings.apply(attributes::put, args);
|
||||
|
@ -54,18 +59,81 @@ public final class MethodSpanAttributesExtractor<REQUEST, RESPONSE>
|
|||
protected void onEnd(
|
||||
AttributesBuilder attributes, REQUEST request, @Nullable RESPONSE response) {}
|
||||
|
||||
private static class MethodSpanAttributeBinder extends BaseAttributeBinder {
|
||||
private final ParameterAttributeNamesExtractor parameterAttributeNamesExtractor;
|
||||
/**
|
||||
* Creates a binding of the parameters of the traced method to span attributes.
|
||||
*
|
||||
* @param method the traced method
|
||||
* @return the bindings of the parameters
|
||||
*/
|
||||
private AttributeBindings bind(Method method) {
|
||||
AttributeBindings bindings = EmptyAttributeBindings.INSTANCE;
|
||||
|
||||
public MethodSpanAttributeBinder(
|
||||
ParameterAttributeNamesExtractor parameterAttributeNamesExtractor) {
|
||||
this.parameterAttributeNamesExtractor = parameterAttributeNamesExtractor;
|
||||
Parameter[] parameters = method.getParameters();
|
||||
if (parameters.length == 0) {
|
||||
return bindings;
|
||||
}
|
||||
|
||||
String[] attributeNames = parameterAttributeNamesExtractor.extract(method, parameters);
|
||||
if (attributeNames.length != parameters.length) {
|
||||
return bindings;
|
||||
}
|
||||
|
||||
for (int i = 0; i < parameters.length; i++) {
|
||||
Parameter parameter = parameters[i];
|
||||
String attributeName = attributeNames[i];
|
||||
if (attributeName == null || attributeName.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bindings =
|
||||
new CombinedAttributeBindings(
|
||||
bindings,
|
||||
i,
|
||||
AttributeBindingFactory.createBinding(
|
||||
attributeName, parameter.getParameterizedType()));
|
||||
}
|
||||
|
||||
return bindings;
|
||||
}
|
||||
|
||||
protected enum EmptyAttributeBindings implements AttributeBindings {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String[] attributeNamesForParameters(
|
||||
Method method, Parameter[] parameters) {
|
||||
return parameterAttributeNamesExtractor.extract(method, parameters);
|
||||
public void apply(AttributeSetter setter, Object[] args) {}
|
||||
}
|
||||
|
||||
private static final class CombinedAttributeBindings implements AttributeBindings {
|
||||
private final AttributeBindings parent;
|
||||
private final int index;
|
||||
private final AttributeBinding binding;
|
||||
|
||||
public CombinedAttributeBindings(
|
||||
AttributeBindings parent, int index, AttributeBinding binding) {
|
||||
this.parent = parent;
|
||||
this.index = index;
|
||||
this.binding = binding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void apply(AttributeSetter setter, Object[] args) {
|
||||
parent.apply(setter, args);
|
||||
if (args != null && args.length > index) {
|
||||
Object arg = args[index];
|
||||
if (arg != null) {
|
||||
binding.apply(setter, arg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,9 +14,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
public interface ParameterAttributeNamesExtractor {
|
||||
/**
|
||||
* Returns an array of the names of the attributes for the parameters of the traced method. The
|
||||
* array should be the same length as the array of the method parameters. An element may be {@code
|
||||
* null} to indicate that the parameter should not be bound to an attribute. The array may also be
|
||||
* {@code null} to indicate that the method has no parameters to bind to attributes.
|
||||
* array must be the same length as the array of the method parameters. An element may be {@code
|
||||
* null} to indicate that the parameter should not be bound to an attribute.
|
||||
*
|
||||
* @param method the traced method
|
||||
* @param parameters the method parameters
|
||||
|
|
|
@ -79,7 +79,16 @@ public final class AsyncOperationEndSupport<REQUEST, RESPONSE> {
|
|||
}
|
||||
|
||||
// fall back to sync end() if asyncValue type doesn't match
|
||||
instrumenter.end(context, request, null, null);
|
||||
instrumenter.end(context, request, tryToGetResponse(responseType, asyncValue), null);
|
||||
return asyncValue;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static <RESPONSE> RESPONSE tryToGetResponse(
|
||||
Class<RESPONSE> responseType, @Nullable Object asyncValue) {
|
||||
if (responseType.isInstance(asyncValue)) {
|
||||
return responseType.cast(asyncValue);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,11 +5,12 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.api.annotation.support.async;
|
||||
|
||||
import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
public enum Jdk8AsyncOperationEndStrategy implements AsyncOperationEndStrategy {
|
||||
INSTANCE;
|
||||
|
@ -76,12 +77,4 @@ public enum Jdk8AsyncOperationEndStrategy implements AsyncOperationEndStrategy {
|
|||
(result, exception) ->
|
||||
instrumenter.end(context, request, tryToGetResponse(responseType, result), exception));
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <RESPONSE> RESPONSE tryToGetResponse(Class<RESPONSE> responseType, Object result) {
|
||||
if (responseType.isInstance(result)) {
|
||||
return responseType.cast(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.tracer.async;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
/**
|
||||
* Registry of {@link AsyncSpanEndStrategy} implementations for tracing the asynchronous operations
|
||||
* represented by the return type of a traced method.
|
||||
*/
|
||||
public class AsyncSpanEndStrategies {
|
||||
private static final AsyncSpanEndStrategies instance = new AsyncSpanEndStrategies();
|
||||
|
||||
public static AsyncSpanEndStrategies getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private final List<AsyncSpanEndStrategy> strategies = new CopyOnWriteArrayList<>();
|
||||
|
||||
private AsyncSpanEndStrategies() {
|
||||
strategies.add(Jdk8AsyncSpanEndStrategy.INSTANCE);
|
||||
}
|
||||
|
||||
public void registerStrategy(AsyncSpanEndStrategy strategy) {
|
||||
Objects.requireNonNull(strategy);
|
||||
strategies.add(strategy);
|
||||
}
|
||||
|
||||
public void unregisterStrategy(AsyncSpanEndStrategy strategy) {
|
||||
strategies.remove(strategy);
|
||||
}
|
||||
|
||||
public void unregisterStrategy(Class<? extends AsyncSpanEndStrategy> strategyClass) {
|
||||
strategies.removeIf(strategy -> strategy.getClass() == strategyClass);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AsyncSpanEndStrategy resolveStrategy(Class<?> returnType) {
|
||||
for (AsyncSpanEndStrategy strategy : strategies) {
|
||||
if (strategy.supports(returnType)) {
|
||||
return strategy;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.tracer.async;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
|
||||
/**
|
||||
* Represents an implementation of a strategy for composing over the return value of an asynchronous
|
||||
* traced method which can compose or register for notification of completion at which point the
|
||||
* span representing the invocation of the method will be ended.
|
||||
*/
|
||||
public interface AsyncSpanEndStrategy {
|
||||
boolean supports(Class<?> returnType);
|
||||
|
||||
/**
|
||||
* Denotes the end of the invocation of the traced method with a successful result which will end
|
||||
* the span stored in the passed {@code context}. The span will remain open until the asynchronous
|
||||
* operation has completed.
|
||||
*
|
||||
* @param tracer {@link BaseTracer} tracer to be used to end the span stored in the {@code
|
||||
* context}.
|
||||
* @param returnValue Return value from the traced method. Must be an instance of a {@code
|
||||
* returnType} for which {@link #supports(Class)} returned true (in particular it must not be
|
||||
* {@code null}).
|
||||
* @return Either {@code returnValue} or a value composing over {@code returnValue} for
|
||||
* notification of completion.
|
||||
*/
|
||||
Object end(BaseTracer tracer, Context context, Object returnValue);
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.tracer.async;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
||||
enum Jdk8AsyncSpanEndStrategy implements AsyncSpanEndStrategy {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public boolean supports(Class<?> returnType) {
|
||||
return returnType == CompletionStage.class || returnType == CompletableFuture.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object end(BaseTracer tracer, Context context, Object returnValue) {
|
||||
if (returnValue instanceof CompletableFuture) {
|
||||
CompletableFuture<?> future = (CompletableFuture<?>) returnValue;
|
||||
if (endSynchronously(future, tracer, context)) {
|
||||
return future;
|
||||
}
|
||||
return endWhenComplete(future, tracer, context);
|
||||
}
|
||||
CompletionStage<?> stage = (CompletionStage<?>) returnValue;
|
||||
return endWhenComplete(stage, tracer, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see if the {@link CompletableFuture} has already been completed and if so
|
||||
* synchronously ends the span to avoid additional allocations and overhead registering for
|
||||
* notification of completion.
|
||||
*/
|
||||
private static boolean endSynchronously(
|
||||
CompletableFuture<?> future, BaseTracer tracer, Context context) {
|
||||
|
||||
if (!future.isDone()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (future.isCompletedExceptionally()) {
|
||||
// If the future completed exceptionally then join to catch the exception
|
||||
// so that it can be recorded to the span
|
||||
try {
|
||||
future.join();
|
||||
} catch (Throwable t) {
|
||||
tracer.endExceptionally(context, t);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
tracer.end(context);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers for notification of the completion of the {@link CompletionStage} at which time the
|
||||
* span will be ended.
|
||||
*/
|
||||
private CompletionStage<?> endWhenComplete(
|
||||
CompletionStage<?> stage, BaseTracer tracer, Context context) {
|
||||
return stage.whenComplete(
|
||||
(result, exception) -> {
|
||||
if (exception != null) {
|
||||
tracer.endExceptionally(context, exception);
|
||||
} else {
|
||||
tracer.end(context);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
/**
|
||||
* Provides implementations of strategies for tracing methods that return asynchronous and reactive
|
||||
* values so that the span can be ended when the asynchronous operation completes.
|
||||
*/
|
||||
@UnstableApi
|
||||
package io.opentelemetry.instrumentation.api.tracer.async;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.annotations.UnstableApi;
|
|
@ -1,122 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.annotation.support
|
||||
|
||||
import io.opentelemetry.instrumentation.api.tracer.AttributeSetter
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Specification
|
||||
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Parameter
|
||||
|
||||
class BaseAttributeBinderTest extends Specification {
|
||||
@Shared
|
||||
Method method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
|
||||
@Shared
|
||||
Object[] args = [ "a", "b", "c" ]
|
||||
|
||||
AttributeSetter setter = Mock()
|
||||
|
||||
def "returns empty bindings for null attribute names array"() {
|
||||
given:
|
||||
def binder = new TestAttributeBinder(null)
|
||||
|
||||
when:
|
||||
AttributeBindings bindings = binder.bind(method)
|
||||
bindings.apply(setter, args)
|
||||
|
||||
then:
|
||||
bindings.isEmpty()
|
||||
0 * setter.setAttribute(*spock.lang.Specification._)
|
||||
}
|
||||
|
||||
def "returns empty bindings for empty attribute names array"() {
|
||||
given:
|
||||
def binder = new TestAttributeBinder(new String[0])
|
||||
|
||||
when:
|
||||
AttributeBindings bindings = binder.bind(method)
|
||||
bindings.apply(setter, args)
|
||||
|
||||
then:
|
||||
bindings.isEmpty()
|
||||
0 * setter.setAttribute(*spock.lang.Specification._)
|
||||
}
|
||||
|
||||
def "returns empty bindings for attribute names array with all null elements"() {
|
||||
given:
|
||||
def binder = new TestAttributeBinder([ null, null, null ] as String[])
|
||||
|
||||
when:
|
||||
AttributeBindings bindings = binder.bind(method)
|
||||
bindings.apply(setter, args)
|
||||
|
||||
then:
|
||||
bindings.isEmpty()
|
||||
0 * setter.setAttribute(*spock.lang.Specification._)
|
||||
}
|
||||
|
||||
def "returns empty bindings for attribute names array with fewer elements than parameters"() {
|
||||
given:
|
||||
def binder = new TestAttributeBinder([ "x", "y" ] as String[])
|
||||
|
||||
when:
|
||||
AttributeBindings bindings = binder.bind(method)
|
||||
bindings.apply(setter, args)
|
||||
|
||||
then:
|
||||
bindings.isEmpty()
|
||||
0 * setter.setAttribute(*spock.lang.Specification._)
|
||||
}
|
||||
|
||||
def "returns bindings for attribute names array"() {
|
||||
given:
|
||||
def binder = new TestAttributeBinder([ "x", "y", "z" ] as String[])
|
||||
|
||||
when:
|
||||
AttributeBindings bindings = binder.bind(method)
|
||||
bindings.apply(setter, args)
|
||||
|
||||
then:
|
||||
!bindings.isEmpty()
|
||||
1 * setter.setAttribute({ it.getKey() == "x" }, "a")
|
||||
1 * setter.setAttribute({ it.getKey() == "y" }, "b")
|
||||
1 * setter.setAttribute({ it.getKey() == "z" }, "c")
|
||||
}
|
||||
|
||||
def "returns bindings for attribute names with null name"() {
|
||||
given:
|
||||
def binder = new TestAttributeBinder([ "x", null, "z" ] as String[])
|
||||
|
||||
when:
|
||||
AttributeBindings bindings = binder.bind(method)
|
||||
bindings.apply(setter, args)
|
||||
|
||||
then:
|
||||
!bindings.isEmpty()
|
||||
1 * setter.setAttribute({ it.getKey() == "x" }, "a")
|
||||
0 * setter.setAttribute(spock.lang.Specification._, "b")
|
||||
1 * setter.setAttribute({ it.getKey() == "z" }, "c")
|
||||
}
|
||||
|
||||
class TestAttributeBinder extends BaseAttributeBinder {
|
||||
final String[] attributeNames
|
||||
|
||||
TestAttributeBinder(String[] attributeNames) {
|
||||
this.attributeNames = attributeNames
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] attributeNamesForParameters(Method method, Parameter[] parameters) {
|
||||
return attributeNames
|
||||
}
|
||||
}
|
||||
|
||||
class TestClass {
|
||||
void method(String x, String y, String z) { }
|
||||
}
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.annotation.support
|
||||
|
||||
import io.opentelemetry.api.common.AttributesBuilder
|
||||
import io.opentelemetry.instrumentation.api.caching.Cache
|
||||
import spock.lang.Specification
|
||||
|
||||
import java.lang.reflect.Method
|
||||
import java.util.function.Function
|
||||
|
||||
class MethodSpanAttributesExtractorTest extends Specification {
|
||||
|
||||
def "extracts attributes for method with attribute names"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> { m, fn -> fn.apply(m) }
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> [ "x", "y", "z" ] as String[] },
|
||||
{ r -> [ "a", "b", "c" ] as String[] },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
1 * builder.put({ it.getKey() == "x" }, "a")
|
||||
1 * builder.put({ it.getKey() == "y" }, "b")
|
||||
1 * builder.put({ it.getKey() == "z" }, "c")
|
||||
}
|
||||
|
||||
def "does not extract attributes for empty attribute name array"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> { m, fn -> fn.apply(m) }
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> new String[0] },
|
||||
{ r -> [ "a", "b", "c" ] as String[] },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
0 * builder.put(*_)
|
||||
}
|
||||
|
||||
def "does not extract attributes for method with attribute names array with fewer elements than parameters"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> { m, fn -> fn.apply(m) }
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> [ "x", "y" ] as String[] },
|
||||
{ r -> [ "a", "b", "c" ] as String[] },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
0 * builder.put(*_)
|
||||
}
|
||||
|
||||
def "extracts attributes for method with attribute names array with null element"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> { m, fn -> fn.apply(m) }
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> [ "x", null, "z" ] as String[] },
|
||||
{ r -> [ "a", "b", "c" ] as String[] },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
1 * builder.put({ it.getKey() == "x" }, "a")
|
||||
1 * builder.put({ it.getKey() == "z" }, "c")
|
||||
0 * builder.put(_, "b")
|
||||
}
|
||||
|
||||
def "does not extracts attribute for method with null argument"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> { m, fn -> fn.apply(m) }
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> [ "x", "y", "z" ] as String[] },
|
||||
{ r -> [ "a", "b", null ] as String[] },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
1 * builder.put({ it.getKey() == "x" }, "a")
|
||||
1 * builder.put({ it.getKey() == "y" }, "b")
|
||||
0 * builder.put({ it.getKey() == "z" }, _)
|
||||
}
|
||||
|
||||
def "applies cached bindings"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
AttributeBindings bindings = Mock {
|
||||
1 * isEmpty() >> false
|
||||
}
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> bindings
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> throw new Exception() },
|
||||
{ r -> [ "a", "b", "c" ] as String[] },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
1 * bindings.apply(_, [ "a", "b", "c" ])
|
||||
}
|
||||
|
||||
def "does not apply cached empty bindings"() {
|
||||
given:
|
||||
def request = new Object()
|
||||
def method = TestClass.getDeclaredMethod("method", String, String, String)
|
||||
AttributesBuilder builder = Mock()
|
||||
|
||||
AttributeBindings bindings = Mock {
|
||||
1 * isEmpty() >> true
|
||||
}
|
||||
Cache<Method, AttributeBindings> cache = Mock {
|
||||
1 * computeIfAbsent(method, _ as Function<Method, AttributeBindings>) >> bindings
|
||||
}
|
||||
|
||||
def extractor = new MethodSpanAttributesExtractor<Object, Object>(
|
||||
{ r -> method },
|
||||
{ m, p -> throw new Exception() },
|
||||
{ r -> throw new Exception() },
|
||||
cache
|
||||
)
|
||||
|
||||
when:
|
||||
extractor.onStart(builder, request)
|
||||
|
||||
then:
|
||||
0 * bindings.apply(_, _)
|
||||
}
|
||||
|
||||
class TestClass {
|
||||
void method(String x, String y, String z) { }
|
||||
}
|
||||
}
|
|
@ -63,7 +63,7 @@ class AsyncOperationEndSupportTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void shouldEndImmediatelyWhenAsyncWrapperisOfWrongType() {
|
||||
void shouldEndImmediatelyWhenAsyncWrapperIsOfWrongType() {
|
||||
// given
|
||||
AsyncOperationEndSupport<String, String> underTest =
|
||||
AsyncOperationEndSupport.create(instrumenter, String.class, CompletableFuture.class);
|
||||
|
@ -76,7 +76,7 @@ class AsyncOperationEndSupportTest {
|
|||
// then
|
||||
assertSame("not async", result);
|
||||
|
||||
verify(instrumenter).end(context, "request", null, null);
|
||||
verify(instrumenter).end(context, "request", "not async", null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.tracer.async
|
||||
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer
|
||||
import spock.lang.Specification
|
||||
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.CompletionException
|
||||
|
||||
class Jdk8AsyncSpanEndStrategyTest extends Specification {
|
||||
BaseTracer tracer
|
||||
|
||||
Context context
|
||||
|
||||
def underTest = Jdk8AsyncSpanEndStrategy.INSTANCE
|
||||
|
||||
void setup() {
|
||||
tracer = Mock()
|
||||
context = Mock()
|
||||
}
|
||||
|
||||
def "ends span on completed future"() {
|
||||
when:
|
||||
underTest.end(tracer, context, CompletableFuture.completedFuture("completed"))
|
||||
|
||||
then:
|
||||
1 * tracer.end(context)
|
||||
}
|
||||
|
||||
def "ends span exceptionally on failed future"() {
|
||||
given:
|
||||
def exception = new CompletionException()
|
||||
def future = new CompletableFuture<String>()
|
||||
future.completeExceptionally(exception)
|
||||
|
||||
when:
|
||||
underTest.end(tracer, context, future)
|
||||
|
||||
then:
|
||||
1 * tracer.endExceptionally(context, exception)
|
||||
}
|
||||
|
||||
def "ends span on future when complete"() {
|
||||
def future = new CompletableFuture<String>()
|
||||
|
||||
when:
|
||||
underTest.end(tracer, context, future)
|
||||
|
||||
then:
|
||||
0 * tracer._
|
||||
|
||||
when:
|
||||
future.complete("completed")
|
||||
|
||||
then:
|
||||
1 * tracer.end(context)
|
||||
}
|
||||
|
||||
def "ends span exceptionally on future when completed exceptionally"() {
|
||||
def future = new CompletableFuture<String>()
|
||||
def exception = new Exception()
|
||||
|
||||
when:
|
||||
underTest.end(tracer, context, future)
|
||||
|
||||
then:
|
||||
0 * tracer._
|
||||
|
||||
when:
|
||||
future.completeExceptionally(exception)
|
||||
|
||||
then:
|
||||
1 * tracer.endExceptionally(context, exception)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.annotation.support;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.caching.Cache;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.function.Function;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class MethodCacheTest {
|
||||
|
||||
@Mock private Function<Method, String> fn;
|
||||
|
||||
@Test
|
||||
public void getItemFromCache() throws Exception {
|
||||
Cache<Method, String> cache = new MethodCache<>();
|
||||
Method key = TestClass.class.getDeclaredMethod("method");
|
||||
String value = "Value";
|
||||
|
||||
cache.put(key, value);
|
||||
|
||||
assertThat(cache.get(key)).isEqualTo("Value");
|
||||
}
|
||||
|
||||
@Test
|
||||
void getItemFromCacheWithEquivalentMethod() throws Exception {
|
||||
Cache<Method, String> cache = new MethodCache<>();
|
||||
Method key = TestClass.class.getDeclaredMethod("method");
|
||||
String value = "Value";
|
||||
|
||||
cache.put(key, value);
|
||||
|
||||
Method otherKey = TestClass.class.getDeclaredMethod("method");
|
||||
assertThat(otherKey).isNotSameAs(key);
|
||||
assertThat(cache.get(otherKey)).isEqualTo(value);
|
||||
}
|
||||
|
||||
@Test
|
||||
void returnNullWhenNotInCache() throws Exception {
|
||||
Cache<Method, String> cache = new MethodCache<>();
|
||||
Method key = TestClass.class.getDeclaredMethod("method");
|
||||
|
||||
assertThat(cache.get(key)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void computesItemIfAbsent() throws Exception {
|
||||
Cache<Method, String> cache = new MethodCache<>();
|
||||
Method key = TestClass.class.getDeclaredMethod("method");
|
||||
String value = "Value";
|
||||
when(fn.apply(key)).thenReturn(value);
|
||||
|
||||
assertThat(cache.computeIfAbsent(key, fn)).isEqualTo(value);
|
||||
verify(fn).apply(key);
|
||||
|
||||
Method otherKey = TestClass.class.getDeclaredMethod("method");
|
||||
assertThat(cache.computeIfAbsent(otherKey, fn)).isEqualTo(value);
|
||||
verifyNoMoreInteractions(fn);
|
||||
}
|
||||
|
||||
@Test
|
||||
void doesNotComputeItemIfPresent() throws Exception {
|
||||
Cache<Method, String> cache = new MethodCache<>();
|
||||
Method key = TestClass.class.getDeclaredMethod("method");
|
||||
String value = "Value";
|
||||
cache.put(key, value);
|
||||
|
||||
assertThat(cache.computeIfAbsent(key, fn)).isEqualTo(value);
|
||||
verifyNoInteractions(fn);
|
||||
}
|
||||
|
||||
static class TestClass {
|
||||
public static void method() {}
|
||||
}
|
||||
}
|
|
@ -7,7 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.guava;
|
|||
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
|
||||
import io.opentelemetry.instrumentation.guava.GuavaAsyncOperationEndStrategy;
|
||||
|
||||
public final class InstrumentationHelper {
|
||||
|
@ -26,7 +25,6 @@ public final class InstrumentationHelper {
|
|||
private static final GuavaAsyncOperationEndStrategy asyncOperationEndStrategy;
|
||||
|
||||
private static void registerAsyncSpanEndStrategy() {
|
||||
AsyncSpanEndStrategies.getInstance().registerStrategy(asyncOperationEndStrategy);
|
||||
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.guava;
|
||||
|
||||
import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
|
@ -12,13 +14,8 @@ import io.opentelemetry.api.trace.Span;
|
|||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
|
||||
import java.util.function.BiConsumer;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
public final class GuavaAsyncOperationEndStrategy
|
||||
implements AsyncOperationEndStrategy, AsyncSpanEndStrategy {
|
||||
public final class GuavaAsyncOperationEndStrategy implements AsyncOperationEndStrategy {
|
||||
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
|
||||
AttributeKey.booleanKey("guava.canceled");
|
||||
|
||||
|
@ -50,55 +47,33 @@ public final class GuavaAsyncOperationEndStrategy
|
|||
Class<RESPONSE> responseType) {
|
||||
|
||||
ListenableFuture<?> future = (ListenableFuture<?>) asyncValue;
|
||||
end(
|
||||
context,
|
||||
future,
|
||||
(result, error) ->
|
||||
instrumenter.end(context, request, tryToGetResponse(responseType, result), error));
|
||||
end(instrumenter, context, request, future, responseType);
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object end(BaseTracer tracer, Context context, Object returnValue) {
|
||||
ListenableFuture<?> future = (ListenableFuture<?>) returnValue;
|
||||
end(
|
||||
context,
|
||||
future,
|
||||
(result, error) -> {
|
||||
if (error == null) {
|
||||
tracer.end(context);
|
||||
} else {
|
||||
tracer.endExceptionally(context, error);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
private void end(Context context, ListenableFuture<?> future, BiConsumer<Object, Throwable> end) {
|
||||
private <REQUEST, RESPONSE> void end(
|
||||
Instrumenter<REQUEST, RESPONSE> instrumenter,
|
||||
Context context,
|
||||
REQUEST request,
|
||||
ListenableFuture<?> future,
|
||||
Class<RESPONSE> responseType) {
|
||||
if (future.isDone()) {
|
||||
if (future.isCancelled()) {
|
||||
if (captureExperimentalSpanAttributes) {
|
||||
Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
|
||||
}
|
||||
end.accept(null, null);
|
||||
instrumenter.end(context, request, null, null);
|
||||
} else {
|
||||
try {
|
||||
Object response = Uninterruptibles.getUninterruptibly(future);
|
||||
end.accept(response, null);
|
||||
instrumenter.end(context, request, tryToGetResponse(responseType, response), null);
|
||||
} catch (Throwable exception) {
|
||||
end.accept(null, exception);
|
||||
instrumenter.end(context, request, null, exception);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
future.addListener(() -> end(context, future, end), Runnable::run);
|
||||
future.addListener(
|
||||
() -> end(instrumenter, context, request, future, responseType), Runnable::run);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <RESPONSE> RESPONSE tryToGetResponse(Class<RESPONSE> responseType, Object result) {
|
||||
if (responseType.isInstance(result)) {
|
||||
return responseType.cast(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public final class MethodRequest {
|
||||
private final Method method;
|
||||
private final Object[] args;
|
||||
|
||||
public MethodRequest(Method method, Object[] args) {
|
||||
this.method = method;
|
||||
this.args = args;
|
||||
}
|
||||
|
||||
public Method method() {
|
||||
return this.method;
|
||||
}
|
||||
|
||||
public Object[] args() {
|
||||
return this.args;
|
||||
}
|
||||
}
|
|
@ -5,7 +5,8 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.otelannotations.WithSpanTracer.tracer;
|
||||
import static io.opentelemetry.javaagent.instrumentation.otelannotations.WithSpanSingletons.instrumenter;
|
||||
import static io.opentelemetry.javaagent.instrumentation.otelannotations.WithSpanSingletons.instrumenterWithAttributes;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.declaresMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.hasParameters;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isAnnotatedWith;
|
||||
|
@ -15,11 +16,11 @@ import static net.bytebuddy.matcher.ElementMatchers.none;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.whereAny;
|
||||
|
||||
import application.io.opentelemetry.extension.annotations.WithSpan;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport;
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
|
@ -115,23 +116,27 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Origin Method method,
|
||||
@Advice.Local("otelOperationEndSupport")
|
||||
AsyncOperationEndSupport<Method, Object> operationEndSupport,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
WithSpan applicationAnnotation = method.getAnnotation(WithSpan.class);
|
||||
|
||||
SpanKind kind = tracer().extractSpanKind(applicationAnnotation);
|
||||
Instrumenter<Method, Object> instrumenter = instrumenter();
|
||||
Context current = Java8BytecodeBridge.currentContext();
|
||||
|
||||
// don't create a nested span if you're not supposed to.
|
||||
if (tracer().shouldStartSpan(current, kind)) {
|
||||
context = tracer().startSpan(current, applicationAnnotation, method, kind, null);
|
||||
if (instrumenter.shouldStart(current, method)) {
|
||||
context = instrumenter.start(current, method);
|
||||
scope = context.makeCurrent();
|
||||
operationEndSupport =
|
||||
AsyncOperationEndSupport.create(instrumenter, Object.class, method.getReturnType());
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Origin Method method,
|
||||
@Advice.Local("otelOperationEndSupport")
|
||||
AsyncOperationEndSupport<Method, Object> operationEndSupport,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope,
|
||||
@Advice.Return(typing = Assigner.Typing.DYNAMIC, readOnly = false) Object returnValue,
|
||||
|
@ -140,12 +145,7 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
return;
|
||||
}
|
||||
scope.close();
|
||||
|
||||
if (throwable != null) {
|
||||
tracer().endExceptionally(context, throwable);
|
||||
} else {
|
||||
returnValue = tracer().end(context, method.getReturnType(), returnValue);
|
||||
}
|
||||
returnValue = operationEndSupport.asyncEnd(context, method, returnValue, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,23 +156,30 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
public static void onEnter(
|
||||
@Advice.Origin Method method,
|
||||
@Advice.AllArguments(typing = Assigner.Typing.DYNAMIC) Object[] args,
|
||||
@Advice.Local("otelOperationEndSupport")
|
||||
AsyncOperationEndSupport<MethodRequest, Object> operationEndSupport,
|
||||
@Advice.Local("otelRequest") MethodRequest request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
WithSpan applicationAnnotation = method.getAnnotation(WithSpan.class);
|
||||
|
||||
SpanKind kind = tracer().extractSpanKind(applicationAnnotation);
|
||||
Instrumenter<MethodRequest, Object> instrumenter = instrumenterWithAttributes();
|
||||
Context current = Java8BytecodeBridge.currentContext();
|
||||
request = new MethodRequest(method, args);
|
||||
|
||||
// don't create a nested span if you're not supposed to.
|
||||
if (tracer().shouldStartSpan(current, kind)) {
|
||||
context = tracer().startSpan(current, applicationAnnotation, method, kind, args);
|
||||
if (instrumenter.shouldStart(current, request)) {
|
||||
context = instrumenter.start(current, request);
|
||||
scope = context.makeCurrent();
|
||||
operationEndSupport =
|
||||
AsyncOperationEndSupport.create(instrumenter, Object.class, method.getReturnType());
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Origin Method method,
|
||||
@Advice.Local("otelOperationEndSupport")
|
||||
AsyncOperationEndSupport<MethodRequest, Object> operationEndSupport,
|
||||
@Advice.Local("otelRequest") MethodRequest request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope,
|
||||
@Advice.Return(typing = Assigner.Typing.DYNAMIC, readOnly = false) Object returnValue,
|
||||
|
@ -181,12 +188,7 @@ public class WithSpanInstrumentation implements TypeInstrumentation {
|
|||
return;
|
||||
}
|
||||
scope.close();
|
||||
|
||||
if (throwable != null) {
|
||||
tracer().endExceptionally(context, throwable);
|
||||
} else {
|
||||
returnValue = tracer().end(context, method.getReturnType(), returnValue);
|
||||
}
|
||||
returnValue = operationEndSupport.asyncEnd(context, request, returnValue, throwable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,7 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.AnnotationReflectionHelper;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.AttributeBindings;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.BaseAttributeBinder;
|
||||
import io.opentelemetry.instrumentation.api.caching.Cache;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.ParameterAttributeNamesExtractor;
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.reflect.Method;
|
||||
|
@ -16,15 +14,14 @@ import java.lang.reflect.Parameter;
|
|||
import java.util.function.Function;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
class WithSpanAttributeBinder extends BaseAttributeBinder {
|
||||
public enum WithSpanParameterAttributeNamesExtractor implements ParameterAttributeNamesExtractor {
|
||||
INSTANCE;
|
||||
|
||||
private static final Cache<Method, AttributeBindings> bindings =
|
||||
Cache.newBuilder().setWeakKeys().build();
|
||||
private static final Class<? extends Annotation> spanAttributeAnnotation;
|
||||
private static final Function<Annotation, String> spanAttributeValueFunction;
|
||||
|
||||
static {
|
||||
ClassLoader classLoader = WithSpanAttributeBinder.class.getClassLoader();
|
||||
ClassLoader classLoader = WithSpanParameterAttributeNamesExtractor.class.getClassLoader();
|
||||
spanAttributeAnnotation =
|
||||
AnnotationReflectionHelper.forNameOrNull(
|
||||
classLoader, "io.opentelemetry.extension.annotations.SpanAttribute");
|
||||
|
@ -46,14 +43,7 @@ class WithSpanAttributeBinder extends BaseAttributeBinder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AttributeBindings bind(Method method) {
|
||||
return spanAttributeAnnotation != null
|
||||
? bindings.computeIfAbsent(method, super::bind)
|
||||
: EmptyAttributeBindings.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String[] attributeNamesForParameters(Method method, Parameter[] parameters) {
|
||||
public @Nullable String[] extract(Method method, Parameter[] parameters) {
|
||||
String[] attributeNames = new String[parameters.length];
|
||||
for (int i = 0; i < parameters.length; i++) {
|
||||
attributeNames[i] = attributeName(parameters[i]);
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
import application.io.opentelemetry.extension.annotations.WithSpan;
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.MethodSpanAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.tracer.SpanNames;
|
||||
import java.lang.reflect.Method;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public final class WithSpanSingletons {
|
||||
private static final String INSTRUMENTATION_NAME =
|
||||
"io.opentelemetry.opentelemetry-annotations-1.0";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WithSpanSingletons.class);
|
||||
private static final Instrumenter<Method, Object> INSTRUMENTER = createInstrumenter();
|
||||
private static final Instrumenter<MethodRequest, Object> INSTRUMENTER_WITH_ATTRIBUTES =
|
||||
createInstrumenterWithAttributes();
|
||||
|
||||
public static Instrumenter<Method, Object> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
public static Instrumenter<MethodRequest, Object> instrumenterWithAttributes() {
|
||||
return INSTRUMENTER_WITH_ATTRIBUTES;
|
||||
}
|
||||
|
||||
private static Instrumenter<Method, Object> createInstrumenter() {
|
||||
return Instrumenter.newBuilder(
|
||||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, WithSpanSingletons::spanNameFromMethod)
|
||||
.newInstrumenter(WithSpanSingletons::spanKindFromMethod);
|
||||
}
|
||||
|
||||
private static Instrumenter<MethodRequest, Object> createInstrumenterWithAttributes() {
|
||||
return Instrumenter.newBuilder(
|
||||
GlobalOpenTelemetry.get(),
|
||||
INSTRUMENTATION_NAME,
|
||||
WithSpanSingletons::spanNameFromMethodRequest)
|
||||
.addAttributesExtractor(
|
||||
MethodSpanAttributesExtractor.newInstance(
|
||||
MethodRequest::method,
|
||||
WithSpanParameterAttributeNamesExtractor.INSTANCE,
|
||||
MethodRequest::args))
|
||||
.newInstrumenter(WithSpanSingletons::spanKindFromMethodRequest);
|
||||
}
|
||||
|
||||
private static SpanKind spanKindFromMethodRequest(MethodRequest request) {
|
||||
return spanKindFromMethod(request.method());
|
||||
}
|
||||
|
||||
private static SpanKind spanKindFromMethod(Method method) {
|
||||
WithSpan annotation = method.getDeclaredAnnotation(WithSpan.class);
|
||||
if (annotation == null) {
|
||||
return SpanKind.INTERNAL;
|
||||
}
|
||||
return toAgentOrNull(annotation.kind());
|
||||
}
|
||||
|
||||
private static SpanKind toAgentOrNull(
|
||||
application.io.opentelemetry.api.trace.SpanKind applicationSpanKind) {
|
||||
try {
|
||||
return SpanKind.valueOf(applicationSpanKind.name());
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.debug("unexpected span kind: {}", applicationSpanKind.name());
|
||||
return SpanKind.INTERNAL;
|
||||
}
|
||||
}
|
||||
|
||||
private static String spanNameFromMethodRequest(MethodRequest request) {
|
||||
return spanNameFromMethod(request.method());
|
||||
}
|
||||
|
||||
private static String spanNameFromMethod(Method method) {
|
||||
WithSpan annotation = method.getDeclaredAnnotation(WithSpan.class);
|
||||
String spanName = annotation.value();
|
||||
if (spanName.isEmpty()) {
|
||||
spanName = SpanNames.fromMethod(method);
|
||||
}
|
||||
return spanName;
|
||||
}
|
||||
}
|
|
@ -1,125 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.otelannotations;
|
||||
|
||||
import application.io.opentelemetry.extension.annotations.WithSpan;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.AttributeBindings;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.SpanNames;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
|
||||
import java.lang.reflect.Method;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class WithSpanTracer extends BaseTracer {
|
||||
private static final WithSpanTracer TRACER = new WithSpanTracer();
|
||||
|
||||
public static WithSpanTracer tracer() {
|
||||
return TRACER;
|
||||
}
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WithSpanTracer.class);
|
||||
|
||||
private final WithSpanAttributeBinder attributeBinder = new WithSpanAttributeBinder();
|
||||
private final AsyncSpanEndStrategies asyncSpanEndStrategies =
|
||||
AsyncSpanEndStrategies.getInstance();
|
||||
|
||||
public Context startSpan(
|
||||
Context parentContext,
|
||||
WithSpan applicationAnnotation,
|
||||
Method method,
|
||||
SpanKind kind,
|
||||
Object[] args) {
|
||||
|
||||
SpanBuilder spanBuilder =
|
||||
spanBuilder(
|
||||
parentContext, spanNameForMethodWithAnnotation(applicationAnnotation, method), kind);
|
||||
Span span = withSpanAttributes(spanBuilder, method, args).startSpan();
|
||||
|
||||
if (kind == SpanKind.SERVER) {
|
||||
return withServerSpan(parentContext, span);
|
||||
}
|
||||
if (kind == SpanKind.CLIENT) {
|
||||
return withClientSpan(parentContext, span);
|
||||
}
|
||||
return parentContext.with(span);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to generate an acceptable span (operation) name based on a given method
|
||||
* reference. It first checks for existence of {@link WithSpan} annotation. If it is present, then
|
||||
* tries to derive name from its {@code value} attribute. Otherwise delegates to {@link
|
||||
* SpanNames#fromMethod(Method)}.
|
||||
*/
|
||||
public String spanNameForMethodWithAnnotation(WithSpan applicationAnnotation, Method method) {
|
||||
if (applicationAnnotation != null && !applicationAnnotation.value().isEmpty()) {
|
||||
return applicationAnnotation.value();
|
||||
}
|
||||
return SpanNames.fromMethod(method);
|
||||
}
|
||||
|
||||
public SpanKind extractSpanKind(WithSpan applicationAnnotation) {
|
||||
application.io.opentelemetry.api.trace.SpanKind applicationKind =
|
||||
applicationAnnotation != null
|
||||
? applicationAnnotation.kind()
|
||||
: application.io.opentelemetry.api.trace.SpanKind.INTERNAL;
|
||||
return toAgentOrNull(applicationKind);
|
||||
}
|
||||
|
||||
public static SpanKind toAgentOrNull(
|
||||
application.io.opentelemetry.api.trace.SpanKind applicationSpanKind) {
|
||||
try {
|
||||
return SpanKind.valueOf(applicationSpanKind.name());
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.debug("unexpected span kind: {}", applicationSpanKind.name());
|
||||
return SpanKind.INTERNAL;
|
||||
}
|
||||
}
|
||||
|
||||
public SpanBuilder withSpanAttributes(SpanBuilder spanBuilder, Method method, Object[] args) {
|
||||
if (args != null && args.length > 0) {
|
||||
AttributeBindings bindings = attributeBinder.bind(method);
|
||||
if (!bindings.isEmpty()) {
|
||||
bindings.apply(spanBuilder::setAttribute, args);
|
||||
}
|
||||
}
|
||||
return spanBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Denotes the end of the invocation of the traced method with a successful result which will end
|
||||
* the span stored in the passed {@code context}. If the method returned a value representing an
|
||||
* asynchronous operation then the span will not be finished until the asynchronous operation has
|
||||
* completed.
|
||||
*
|
||||
* @param returnType Return type of the traced method.
|
||||
* @param returnValue Return value from the traced method.
|
||||
* @return Either {@code returnValue} or a value composing over {@code returnValue} for
|
||||
* notification of completion.
|
||||
* @throws ClassCastException if returnValue is not an instance of returnType
|
||||
*/
|
||||
public Object end(Context context, Class<?> returnType, Object returnValue) {
|
||||
if (returnType.isInstance(returnValue)) {
|
||||
AsyncSpanEndStrategy asyncSpanEndStrategy =
|
||||
asyncSpanEndStrategies.resolveStrategy(returnType);
|
||||
if (asyncSpanEndStrategy != null) {
|
||||
return asyncSpanEndStrategy.end(this, context, returnValue);
|
||||
}
|
||||
}
|
||||
end(context);
|
||||
return returnValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getInstrumentationName() {
|
||||
return "io.opentelemetry.opentelemetry-annotations-1.0";
|
||||
}
|
||||
}
|
|
@ -5,22 +5,20 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.reactor;
|
||||
|
||||
import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public final class ReactorAsyncOperationEndStrategy
|
||||
implements AsyncOperationEndStrategy, AsyncSpanEndStrategy {
|
||||
public final class ReactorAsyncOperationEndStrategy implements AsyncOperationEndStrategy {
|
||||
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
|
||||
AttributeKey.booleanKey("reactor.canceled");
|
||||
|
||||
|
@ -43,23 +41,6 @@ public final class ReactorAsyncOperationEndStrategy
|
|||
return returnType == Publisher.class || returnType == Mono.class || returnType == Flux.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object end(BaseTracer tracer, Context context, Object returnValue) {
|
||||
|
||||
EndOnFirstNotificationConsumer notificationConsumer =
|
||||
new EndOnFirstNotificationConsumer(context) {
|
||||
@Override
|
||||
protected void end(Object result, Throwable error) {
|
||||
if (error == null) {
|
||||
tracer.end(context);
|
||||
} else {
|
||||
tracer.endExceptionally(context, error);
|
||||
}
|
||||
}
|
||||
};
|
||||
return end(returnValue, notificationConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <REQUEST, RESPONSE> Object end(
|
||||
Instrumenter<REQUEST, RESPONSE> instrumenter,
|
||||
|
@ -75,11 +56,6 @@ public final class ReactorAsyncOperationEndStrategy
|
|||
instrumenter.end(context, request, tryToGetResponse(responseType, result), error);
|
||||
}
|
||||
};
|
||||
return end(asyncValue, notificationConsumer);
|
||||
}
|
||||
|
||||
private static Object end(
|
||||
Object asyncValue, EndOnFirstNotificationConsumer notificationConsumer) {
|
||||
|
||||
if (asyncValue instanceof Mono) {
|
||||
Mono<?> mono = (Mono<?>) asyncValue;
|
||||
|
@ -94,14 +70,6 @@ public final class ReactorAsyncOperationEndStrategy
|
|||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <RESPONSE> RESPONSE tryToGetResponse(Class<RESPONSE> responseType, Object result) {
|
||||
if (responseType.isInstance(result)) {
|
||||
return responseType.cast(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
|
||||
* OnError notifications are received. Multiple notifications can happen anytime multiple
|
||||
|
|
|
@ -24,7 +24,6 @@ package io.opentelemetry.instrumentation.reactor;
|
|||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
@ -61,25 +60,32 @@ public final class TracingOperator {
|
|||
* application.
|
||||
*/
|
||||
public void registerOnEachOperator() {
|
||||
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift());
|
||||
AsyncSpanEndStrategies.getInstance().registerStrategy(asyncOperationEndStrategy);
|
||||
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
|
||||
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
|
||||
}
|
||||
|
||||
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
|
||||
public void resetOnEachOperator() {
|
||||
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
|
||||
AsyncSpanEndStrategies.getInstance().unregisterStrategy(asyncOperationEndStrategy);
|
||||
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
|
||||
}
|
||||
|
||||
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
|
||||
return Operators.lift(new Lifter<>());
|
||||
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
|
||||
ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
|
||||
return Operators.lift(new Lifter<>(asyncOperationEndStrategy));
|
||||
}
|
||||
|
||||
public static class Lifter<T>
|
||||
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
|
||||
|
||||
/** Holds reference to strategy to prevent it from being collected. */
|
||||
@SuppressWarnings("FieldCanBeLocal")
|
||||
private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;
|
||||
|
||||
public Lifter(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
|
||||
this.asyncOperationEndStrategy = asyncOperationEndStrategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
|
||||
// if Flux/Mono #just, #empty, #error
|
||||
|
|
|
@ -5,13 +5,13 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.rxjava2;
|
||||
|
||||
import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
|
||||
import io.reactivex.Completable;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.Maybe;
|
||||
|
@ -22,11 +22,9 @@ import io.reactivex.functions.BiConsumer;
|
|||
import io.reactivex.functions.Consumer;
|
||||
import io.reactivex.parallel.ParallelFlowable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
public final class RxJava2AsyncOperationEndStrategy
|
||||
implements AsyncOperationEndStrategy, AsyncSpanEndStrategy {
|
||||
public final class RxJava2AsyncOperationEndStrategy implements AsyncOperationEndStrategy {
|
||||
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
|
||||
AttributeKey.booleanKey("rxjava.canceled");
|
||||
|
||||
|
@ -63,34 +61,14 @@ public final class RxJava2AsyncOperationEndStrategy
|
|||
Object asyncValue,
|
||||
Class<RESPONSE> responseType) {
|
||||
|
||||
return end(
|
||||
asyncValue,
|
||||
EndOnFirstNotificationConsumer<Object> notificationConsumer =
|
||||
new EndOnFirstNotificationConsumer<Object>(context) {
|
||||
@Override
|
||||
protected void end(Object response, Throwable error) {
|
||||
instrumenter.end(context, request, tryToGetResponse(responseType, response), error);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public Object end(BaseTracer tracer, Context context, Object returnValue) {
|
||||
return end(
|
||||
returnValue,
|
||||
new EndOnFirstNotificationConsumer<Object>(context) {
|
||||
@Override
|
||||
protected void end(Object response, Throwable error) {
|
||||
if (error != null) {
|
||||
tracer.endExceptionally(context, error);
|
||||
} else {
|
||||
tracer.end(context);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static <T> Object end(
|
||||
Object asyncValue, EndOnFirstNotificationConsumer<T> notificationConsumer) {
|
||||
if (asyncValue instanceof Completable) {
|
||||
return endWhenComplete((Completable) asyncValue, notificationConsumer);
|
||||
} else if (asyncValue instanceof Maybe) {
|
||||
|
@ -153,14 +131,6 @@ public final class RxJava2AsyncOperationEndStrategy
|
|||
.doOnCancel(notificationConsumer::onCancelOrDispose);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <RESPONSE> RESPONSE tryToGetResponse(Class<RESPONSE> responseType, Object result) {
|
||||
if (responseType.isInstance(result)) {
|
||||
return responseType.cast(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
|
||||
* OnError notifications are received. Multiple notifications can happen anytime multiple
|
||||
|
|
|
@ -25,7 +25,6 @@ package io.opentelemetry.instrumentation.rxjava2;
|
|||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
|
||||
import io.reactivex.Completable;
|
||||
import io.reactivex.CompletableObserver;
|
||||
import io.reactivex.Flowable;
|
||||
|
@ -254,8 +253,6 @@ public final class TracingAssembly {
|
|||
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
|
||||
.build();
|
||||
|
||||
AsyncSpanEndStrategies.getInstance().registerStrategy(asyncOperationEndStrategy);
|
||||
|
||||
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
|
||||
}
|
||||
|
||||
|
@ -293,10 +290,7 @@ public final class TracingAssembly {
|
|||
|
||||
private static void disableWithSpanStrategy() {
|
||||
if (asyncOperationEndStrategy != null) {
|
||||
AsyncSpanEndStrategies.getInstance().unregisterStrategy(asyncOperationEndStrategy);
|
||||
|
||||
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
|
||||
|
||||
asyncOperationEndStrategy = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,13 +5,13 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.rxjava3;
|
||||
|
||||
import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.Flowable;
|
||||
import io.reactivex.rxjava3.core.Maybe;
|
||||
|
@ -22,11 +22,9 @@ import io.reactivex.rxjava3.functions.BiConsumer;
|
|||
import io.reactivex.rxjava3.functions.Consumer;
|
||||
import io.reactivex.rxjava3.parallel.ParallelFlowable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
public final class RxJava3AsyncOperationEndStrategy
|
||||
implements AsyncOperationEndStrategy, AsyncSpanEndStrategy {
|
||||
public final class RxJava3AsyncOperationEndStrategy implements AsyncOperationEndStrategy {
|
||||
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
|
||||
AttributeKey.booleanKey("rxjava.canceled");
|
||||
|
||||
|
@ -63,34 +61,14 @@ public final class RxJava3AsyncOperationEndStrategy
|
|||
Object asyncValue,
|
||||
Class<RESPONSE> responseType) {
|
||||
|
||||
return end(
|
||||
asyncValue,
|
||||
EndOnFirstNotificationConsumer<Object> notificationConsumer =
|
||||
new EndOnFirstNotificationConsumer<Object>(context) {
|
||||
@Override
|
||||
protected void end(Object response, Throwable error) {
|
||||
instrumenter.end(context, request, tryToGetResponse(responseType, response), error);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public Object end(BaseTracer tracer, Context context, Object returnValue) {
|
||||
return end(
|
||||
returnValue,
|
||||
new EndOnFirstNotificationConsumer<Object>(context) {
|
||||
@Override
|
||||
protected void end(Object response, Throwable error) {
|
||||
if (error != null) {
|
||||
tracer.endExceptionally(context, error);
|
||||
} else {
|
||||
tracer.end(context);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static <T> Object end(
|
||||
Object asyncValue, EndOnFirstNotificationConsumer<T> notificationConsumer) {
|
||||
if (asyncValue instanceof Completable) {
|
||||
return endWhenComplete((Completable) asyncValue, notificationConsumer);
|
||||
} else if (asyncValue instanceof Maybe) {
|
||||
|
@ -153,14 +131,6 @@ public final class RxJava3AsyncOperationEndStrategy
|
|||
.doOnCancel(notificationConsumer::onCancelOrDispose);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <RESPONSE> RESPONSE tryToGetResponse(Class<RESPONSE> responseType, Object result) {
|
||||
if (responseType.isInstance(result)) {
|
||||
return responseType.cast(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
|
||||
* OnError notifications are received. Multiple notifications can happen anytime multiple
|
||||
|
|
|
@ -25,7 +25,6 @@ package io.opentelemetry.instrumentation.rxjava3;
|
|||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
|
||||
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.CompletableObserver;
|
||||
import io.reactivex.rxjava3.core.Flowable;
|
||||
|
@ -252,8 +251,6 @@ public final class TracingAssembly {
|
|||
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
|
||||
.build();
|
||||
|
||||
AsyncSpanEndStrategies.getInstance().registerStrategy(asyncOperationEndStrategy);
|
||||
|
||||
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
|
||||
}
|
||||
|
||||
|
@ -291,7 +288,6 @@ public final class TracingAssembly {
|
|||
|
||||
private static void disableWithSpanStrategy() {
|
||||
if (asyncOperationEndStrategy != null) {
|
||||
AsyncSpanEndStrategies.getInstance().unregisterStrategy(asyncOperationEndStrategy);
|
||||
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
|
||||
asyncOperationEndStrategy = null;
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperat
|
|||
import io.opentelemetry.instrumentation.api.annotation.support.async.Jdk8AsyncOperationEndStrategy;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
|
@ -39,28 +38,31 @@ public final class WeakRefAsyncOperationEndStrategies extends AsyncOperationEndS
|
|||
|
||||
@Override
|
||||
public void unregisterStrategy(AsyncOperationEndStrategy strategy) {
|
||||
ListIterator<WeakReference<AsyncOperationEndStrategy>> it = strategies.listIterator();
|
||||
while (it.hasNext()) {
|
||||
AsyncOperationEndStrategy s = it.next().get();
|
||||
if (s == null || s == strategy) {
|
||||
it.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
strategies.removeIf(
|
||||
ref -> {
|
||||
AsyncOperationEndStrategy s = ref.get();
|
||||
return s == null || s == strategy;
|
||||
});
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public AsyncOperationEndStrategy resolveStrategy(Class<?> returnType) {
|
||||
ListIterator<WeakReference<AsyncOperationEndStrategy>> it = strategies.listIterator();
|
||||
while (it.hasNext()) {
|
||||
AsyncOperationEndStrategy s = it.next().get();
|
||||
if (s == null) {
|
||||
it.remove();
|
||||
} else if (s.supports(returnType)) {
|
||||
return s;
|
||||
boolean purgeCollectedWeakReferences = false;
|
||||
try {
|
||||
for (WeakReference<AsyncOperationEndStrategy> ref : strategies) {
|
||||
AsyncOperationEndStrategy s = ref.get();
|
||||
if (s == null) {
|
||||
purgeCollectedWeakReferences = true;
|
||||
} else if (s.supports(returnType)) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
if (purgeCollectedWeakReferences) {
|
||||
strategies.removeIf(ref -> ref.get() == null);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue