Implement a dedicated reactor-netty 1.0 instrumentation (#4662)
* Implement a dedicated reactor-netty 1.0 instrumentation * Apply suggestions from code review Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com> * code review comments * code review comments * code review comments Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
parent
31d87162f3
commit
9235719839
|
@ -117,7 +117,7 @@ public final class ContextPropagationOperator {
|
|||
}
|
||||
|
||||
/** Forces Mono to run in traceContext scope. */
|
||||
static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
|
||||
public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
|
||||
if (!enabled) {
|
||||
return publisher;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ dependencies {
|
|||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
|
||||
implementation(project(":instrumentation:netty:netty-4.1-common:javaagent"))
|
||||
implementation(project(":instrumentation:reactor-3.1:library"))
|
||||
|
||||
library("io.projectreactor.netty:reactor-netty-http:1.0.0")
|
||||
|
||||
testInstrumentation(project(":instrumentation:reactor-netty:reactor-netty-0.9:javaagent"))
|
||||
|
|
|
@ -5,10 +5,8 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys;
|
||||
import java.util.function.BiConsumer;
|
||||
import javax.annotation.Nullable;
|
||||
import reactor.netty.Connection;
|
||||
|
@ -24,15 +22,19 @@ public final class DecoratorFunctions {
|
|||
|
||||
public static final class OnMessageDecorator<M extends HttpClientInfos>
|
||||
implements BiConsumer<M, Connection> {
|
||||
private final BiConsumer<? super M, ? super Connection> delegate;
|
||||
|
||||
public OnMessageDecorator(BiConsumer<? super M, ? super Connection> delegate) {
|
||||
private final BiConsumer<? super M, ? super Connection> delegate;
|
||||
private final PropagatedContext propagatedContext;
|
||||
|
||||
public OnMessageDecorator(
|
||||
BiConsumer<? super M, ? super Connection> delegate, PropagatedContext propagatedContext) {
|
||||
this.delegate = delegate;
|
||||
this.propagatedContext = propagatedContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(M message, Connection connection) {
|
||||
Context context = getChannelContext(message.currentContextView(), connection.channel());
|
||||
Context context = getChannelContext(message.currentContextView(), propagatedContext);
|
||||
if (context == null) {
|
||||
delegate.accept(message, connection);
|
||||
} else {
|
||||
|
@ -45,15 +47,19 @@ public final class DecoratorFunctions {
|
|||
|
||||
public static final class OnMessageErrorDecorator<M extends HttpClientInfos>
|
||||
implements BiConsumer<M, Throwable> {
|
||||
private final BiConsumer<? super M, ? super Throwable> delegate;
|
||||
|
||||
public OnMessageErrorDecorator(BiConsumer<? super M, ? super Throwable> delegate) {
|
||||
private final BiConsumer<? super M, ? super Throwable> delegate;
|
||||
private final PropagatedContext propagatedContext;
|
||||
|
||||
public OnMessageErrorDecorator(
|
||||
BiConsumer<? super M, ? super Throwable> delegate, PropagatedContext propagatedContext) {
|
||||
this.delegate = delegate;
|
||||
this.propagatedContext = propagatedContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(M message, Throwable throwable) {
|
||||
Context context = getChannelContext(message.currentContextView(), null);
|
||||
Context context = getChannelContext(message.currentContextView(), propagatedContext);
|
||||
if (context == null) {
|
||||
delegate.accept(message, throwable);
|
||||
} else {
|
||||
|
@ -65,16 +71,27 @@ public final class DecoratorFunctions {
|
|||
}
|
||||
|
||||
@Nullable
|
||||
private static Context getChannelContext(ContextView contextView, @Nullable Channel channel) {
|
||||
// try to get the client span context from the channel if it's available
|
||||
if (channel != null) {
|
||||
Context context = channel.attr(AttributeKeys.CLIENT_CONTEXT).get();
|
||||
if (context != null) {
|
||||
return context;
|
||||
}
|
||||
private static Context getChannelContext(
|
||||
ContextView contextView, PropagatedContext propagatedContext) {
|
||||
Context context = null;
|
||||
if (propagatedContext.useClientContext) {
|
||||
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null);
|
||||
}
|
||||
if (context == null) {
|
||||
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null);
|
||||
}
|
||||
return context;
|
||||
}
|
||||
|
||||
public enum PropagatedContext {
|
||||
PARENT(false),
|
||||
CLIENT(true);
|
||||
|
||||
final boolean useClientContext;
|
||||
|
||||
PropagatedContext(boolean useClientContext) {
|
||||
this.useClientContext = useClientContext;
|
||||
}
|
||||
// otherwise use the parent span context
|
||||
return contextView.getOrDefault(MapConnect.CONTEXT_ATTRIBUTE, null);
|
||||
}
|
||||
|
||||
private DecoratorFunctions() {}
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
@ -14,13 +13,12 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
|||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
|
||||
import io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.DecoratorFunctions.PropagatedContext;
|
||||
import java.util.function.BiConsumer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import reactor.netty.Connection;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.HttpClientRequest;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
|
||||
|
@ -32,17 +30,19 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
isStatic().and(namedOneOf("create", "newConnection", "from")),
|
||||
this.getClass().getName() + "$CreateAdvice");
|
||||
|
||||
// advice classes below expose current context in doOn*/doAfter* callbacks
|
||||
transformer.applyAdviceToMethod(
|
||||
isPublic()
|
||||
.and(namedOneOf("doOnRequest", "doAfterRequest"))
|
||||
.and(named("doOnRequest"))
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiConsumer.class)),
|
||||
this.getClass().getName() + "$OnRequestAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isPublic()
|
||||
.and(named("doAfterRequest"))
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiConsumer.class)),
|
||||
this.getClass().getName() + "$AfterRequestAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isPublic()
|
||||
.and(named("doOnRequestError"))
|
||||
|
@ -51,10 +51,16 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
this.getClass().getName() + "$OnRequestErrorAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isPublic()
|
||||
.and(namedOneOf("doOnResponse", "doAfterResponseSuccess", "doOnRedirect"))
|
||||
.and(named("doOnResponse"))
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiConsumer.class)),
|
||||
this.getClass().getName() + "$OnResponseAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isPublic()
|
||||
.and(namedOneOf("doAfterResponseSuccess", "doOnRedirect"))
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiConsumer.class)),
|
||||
this.getClass().getName() + "$AfterResponseAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isPublic()
|
||||
.and(named("doOnResponseError"))
|
||||
|
@ -70,27 +76,6 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
this.getClass().getName() + "$OnErrorAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class CreateAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) {
|
||||
callDepth = CallDepth.forClass(HttpClient.class);
|
||||
callDepth.getAndIncrement();
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Thrown Throwable throwable,
|
||||
@Advice.Return(readOnly = false) HttpClient client,
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth) {
|
||||
|
||||
if (callDepth.decrementAndGet() == 0 && throwable == null) {
|
||||
client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class OnRequestAdvice {
|
||||
|
||||
|
@ -98,8 +83,24 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false)
|
||||
BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
|
||||
callback = new DecoratorFunctions.OnMessageDecorator<>(callback);
|
||||
callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class AfterRequestAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false)
|
||||
BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
|
||||
// use client context after request is sent
|
||||
callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -111,8 +112,10 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false)
|
||||
BiConsumer<? super HttpClientRequest, ? super Throwable> callback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
|
||||
callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback);
|
||||
callback =
|
||||
new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,8 +127,24 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false)
|
||||
BiConsumer<? super HttpClientResponse, ? super Connection> callback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
|
||||
callback = new DecoratorFunctions.OnMessageDecorator<>(callback);
|
||||
// use client context just when response status & headers are received
|
||||
callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class AfterResponseAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false)
|
||||
BiConsumer<? super HttpClientResponse, ? super Connection> callback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
|
||||
callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -137,8 +156,10 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false)
|
||||
BiConsumer<? super HttpClientResponse, ? super Throwable> callback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
|
||||
callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback);
|
||||
callback =
|
||||
new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -152,11 +173,16 @@ public class HttpClientInstrumentation implements TypeInstrumentation {
|
|||
BiConsumer<? super HttpClientRequest, ? super Throwable> requestCallback,
|
||||
@Advice.Argument(value = 1, readOnly = false)
|
||||
BiConsumer<? super HttpClientResponse, ? super Throwable> responseCallback) {
|
||||
|
||||
if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) {
|
||||
requestCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(requestCallback);
|
||||
requestCallback =
|
||||
new DecoratorFunctions.OnMessageErrorDecorator<>(
|
||||
requestCallback, PropagatedContext.PARENT);
|
||||
}
|
||||
if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) {
|
||||
responseCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(responseCallback);
|
||||
responseCallback =
|
||||
new DecoratorFunctions.OnMessageErrorDecorator<>(
|
||||
responseCallback, PropagatedContext.PARENT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.opentelemetry.context.propagation.TextMapSetter;
|
||||
import javax.annotation.Nullable;
|
||||
import reactor.netty.http.client.HttpClientRequest;
|
||||
|
||||
enum HttpClientRequestHeadersSetter implements TextMapSetter<HttpClientRequest> {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public void set(@Nullable HttpClientRequest request, String key, String value) {
|
||||
request.header(key, value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY;
|
||||
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY;
|
||||
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import javax.annotation.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.Connection;
|
||||
import reactor.netty.ConnectionObserver;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.HttpClientConfig;
|
||||
import reactor.netty.http.client.HttpClientRequest;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
import reactor.netty.http.client.HttpClientState;
|
||||
|
||||
public final class HttpResponseReceiverInstrumenter {
|
||||
|
||||
// this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP
|
||||
// request processing
|
||||
// it should be used just before one of the response*() methods is called - after this point the
|
||||
// HTTP
|
||||
// request is no longer modifiable by the user
|
||||
@Nullable
|
||||
public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseReceiver<?> receiver) {
|
||||
// receiver should always be an HttpClientFinalizer, which both extends HttpClient and
|
||||
// implements ResponseReceiver
|
||||
if (receiver instanceof HttpClient) {
|
||||
HttpClient client = (HttpClient) receiver;
|
||||
HttpClientConfig config = client.configuration();
|
||||
|
||||
ContextHolder contextHolder = new ContextHolder();
|
||||
|
||||
HttpClient modified =
|
||||
client
|
||||
.mapConnect(new StartOperation(contextHolder, config))
|
||||
.doOnRequest(new PropagateContext(contextHolder))
|
||||
.doOnRequestError(new EndOperationWithError(contextHolder, config))
|
||||
.observe(new EndOperation(contextHolder, config));
|
||||
|
||||
// modified should always be an HttpClientFinalizer too
|
||||
if (modified instanceof HttpClient.ResponseReceiver) {
|
||||
return (HttpClient.ResponseReceiver<?>) modified;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
static final class ContextHolder {
|
||||
volatile Context parentContext;
|
||||
volatile Context context;
|
||||
}
|
||||
|
||||
static final class StartOperation
|
||||
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {
|
||||
|
||||
private final ContextHolder contextHolder;
|
||||
private final HttpClientConfig config;
|
||||
|
||||
StartOperation(ContextHolder contextHolder, HttpClientConfig config) {
|
||||
this.contextHolder = contextHolder;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
|
||||
return Mono.defer(
|
||||
() -> {
|
||||
Context parentContext = Context.current();
|
||||
contextHolder.parentContext = parentContext;
|
||||
if (!instrumenter().shouldStart(parentContext, config)) {
|
||||
// make context accessible via the reactor ContextView - the doOn* callbacks
|
||||
// instrumentation uses this to set the proper context for callbacks
|
||||
return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
|
||||
}
|
||||
|
||||
Context context = instrumenter().start(parentContext, config);
|
||||
contextHolder.context = context;
|
||||
return ContextPropagationOperator.runWithContext(mono, context)
|
||||
// make contexts accessible via the reactor ContextView - the doOn* callbacks
|
||||
// instrumentation uses the parent context to set the proper context for callbacks
|
||||
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
|
||||
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static final class PropagateContext implements BiConsumer<HttpClientRequest, Connection> {
|
||||
|
||||
private final ContextHolder contextHolder;
|
||||
|
||||
PropagateContext(ContextHolder contextHolder) {
|
||||
this.contextHolder = contextHolder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(HttpClientRequest httpClientRequest, Connection connection) {
|
||||
Context context = contextHolder.context;
|
||||
if (context != null) {
|
||||
GlobalOpenTelemetry.getPropagators()
|
||||
.getTextMapPropagator()
|
||||
.inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE);
|
||||
}
|
||||
|
||||
// also propagate the context to the underlying netty instrumentation
|
||||
// if this span was suppressed and context is null, propagate parentContext - this will allow
|
||||
// netty spans to be suppressed too
|
||||
Context nettyParentContext = context == null ? contextHolder.parentContext : context;
|
||||
connection.channel().attr(AttributeKeys.WRITE_CONTEXT).set(nettyParentContext);
|
||||
}
|
||||
}
|
||||
|
||||
static final class EndOperationWithError implements BiConsumer<HttpClientRequest, Throwable> {
|
||||
|
||||
private final ContextHolder contextHolder;
|
||||
private final HttpClientConfig config;
|
||||
|
||||
EndOperationWithError(ContextHolder contextHolder, HttpClientConfig config) {
|
||||
this.contextHolder = contextHolder;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(HttpClientRequest httpClientRequest, Throwable error) {
|
||||
Context context = contextHolder.context;
|
||||
if (context == null) {
|
||||
return;
|
||||
}
|
||||
instrumenter().end(context, config, null, error);
|
||||
}
|
||||
}
|
||||
|
||||
static final class EndOperation implements ConnectionObserver {
|
||||
|
||||
private final ContextHolder contextHolder;
|
||||
private final HttpClientConfig config;
|
||||
|
||||
EndOperation(ContextHolder contextHolder, HttpClientConfig config) {
|
||||
this.contextHolder = contextHolder;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStateChange(Connection connection, State newState) {
|
||||
if (newState != HttpClientState.RESPONSE_COMPLETED) {
|
||||
return;
|
||||
}
|
||||
|
||||
Context context = contextHolder.context;
|
||||
if (context == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// connection is actually an instance of HttpClientOperations - a package private class that
|
||||
// implements both Connection and HttpClientResponse
|
||||
if (connection instanceof HttpClientResponse) {
|
||||
HttpClientResponse response = (HttpClientResponse) connection;
|
||||
instrumenter().end(context, config, response, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUncaughtException(Connection connection, Throwable error) {
|
||||
Context context = contextHolder.context;
|
||||
if (context == null) {
|
||||
return;
|
||||
}
|
||||
instrumenter().end(context, config, null, error);
|
||||
}
|
||||
}
|
||||
|
||||
private HttpResponseReceiverInstrumenter() {}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import java.util.function.Function;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.Connection;
|
||||
|
||||
public class MapConnect
|
||||
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {
|
||||
|
||||
static final String CONTEXT_ATTRIBUTE = MapConnect.class.getName() + ".Context";
|
||||
|
||||
@Override
|
||||
public Mono<? extends Connection> apply(Mono<? extends Connection> m) {
|
||||
return m.contextWrite(s -> s.put(CONTEXT_ATTRIBUTE, Context.current()));
|
||||
}
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys;
|
||||
import java.util.function.BiConsumer;
|
||||
import reactor.netty.Connection;
|
||||
import reactor.netty.http.client.HttpClientRequest;
|
||||
|
||||
public class OnRequest implements BiConsumer<HttpClientRequest, Connection> {
|
||||
@Override
|
||||
public void accept(HttpClientRequest r, Connection c) {
|
||||
Context context = r.currentContextView().get(MapConnect.CONTEXT_ATTRIBUTE);
|
||||
c.channel().attr(AttributeKeys.WRITE_CONTEXT).set(context);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
public final class ReactorContextKeys {
|
||||
|
||||
public static final String CLIENT_PARENT_CONTEXT_KEY =
|
||||
ReactorContextKeys.class.getName() + ".client-parent-context";
|
||||
public static final String CLIENT_CONTEXT_KEY =
|
||||
ReactorContextKeys.class.getName() + ".client-context";
|
||||
|
||||
private ReactorContextKeys() {}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
import javax.annotation.Nullable;
|
||||
import reactor.netty.http.client.HttpClientConfig;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
|
||||
final class ReactorNettyHttpClientAttributesExtractor
|
||||
extends HttpClientAttributesExtractor<HttpClientConfig, HttpClientResponse> {
|
||||
|
||||
@Override
|
||||
protected String url(HttpClientConfig request) {
|
||||
String uri = request.uri();
|
||||
if (isAbsolute(uri)) {
|
||||
return uri;
|
||||
}
|
||||
|
||||
// use the baseUrl if it was configured
|
||||
String baseUrl = request.baseUrl();
|
||||
if (baseUrl != null) {
|
||||
if (baseUrl.endsWith("/") && uri.startsWith("/")) {
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
|
||||
}
|
||||
return baseUrl + uri;
|
||||
}
|
||||
|
||||
// otherwise, use the host+port config to construct the full url
|
||||
SocketAddress hostAddress = request.remoteAddress().get();
|
||||
if (hostAddress instanceof InetSocketAddress) {
|
||||
InetSocketAddress inetHostAddress = (InetSocketAddress) hostAddress;
|
||||
return (request.isSecure() ? "https://" : "http://")
|
||||
+ inetHostAddress.getHostName()
|
||||
+ ":"
|
||||
+ inetHostAddress.getPort()
|
||||
+ (uri.startsWith("/") ? "" : "/")
|
||||
+ uri;
|
||||
}
|
||||
|
||||
return uri;
|
||||
}
|
||||
|
||||
private static boolean isAbsolute(String uri) {
|
||||
return uri.startsWith("http://") || uri.startsWith("https://");
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected String flavor(HttpClientConfig request, @Nullable HttpClientResponse response) {
|
||||
if (response != null) {
|
||||
String flavor = response.version().text();
|
||||
if (flavor.startsWith("HTTP/")) {
|
||||
flavor = flavor.substring("HTTP/".length());
|
||||
}
|
||||
return flavor;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String method(HttpClientConfig request) {
|
||||
return request.method().name();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> requestHeader(HttpClientConfig request, String name) {
|
||||
return request.headers().getAll(name);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected Long requestContentLength(
|
||||
HttpClientConfig request, @Nullable HttpClientResponse response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected Long requestContentLengthUncompressed(
|
||||
HttpClientConfig request, @Nullable HttpClientResponse response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer statusCode(HttpClientConfig request, HttpClientResponse response) {
|
||||
return response.status().code();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected Long responseContentLength(HttpClientConfig request, HttpClientResponse response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected Long responseContentLengthUncompressed(
|
||||
HttpClientConfig request, HttpClientResponse response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> responseHeader(
|
||||
HttpClientConfig request, HttpClientResponse response, String name) {
|
||||
return response.responseHeaders().getAll(name);
|
||||
}
|
||||
}
|
|
@ -38,6 +38,9 @@ public class ReactorNettyInstrumentationModule extends InstrumentationModule {
|
|||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return asList(new HttpClientInstrumentation(), new TransportConnectorInstrumentation());
|
||||
return asList(
|
||||
new HttpClientInstrumentation(),
|
||||
new ResponseReceiverInstrumentation(),
|
||||
new TransportConnectorInstrumentation());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetClientAttributesExtractor;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import javax.annotation.Nullable;
|
||||
import reactor.netty.Connection;
|
||||
import reactor.netty.http.client.HttpClientConfig;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
|
||||
final class ReactorNettyNetClientAttributesExtractor
|
||||
extends InetSocketAddressNetClientAttributesExtractor<HttpClientConfig, HttpClientResponse> {
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String transport(HttpClientConfig request, @Nullable HttpClientResponse response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public InetSocketAddress getAddress(
|
||||
HttpClientConfig request, @Nullable HttpClientResponse response) {
|
||||
|
||||
// we're making use of the fact that HttpClientOperations is both a Connection and an
|
||||
// HttpClientResponse
|
||||
if (response instanceof Connection) {
|
||||
Connection connection = (Connection) response;
|
||||
SocketAddress address = connection.channel().remoteAddress();
|
||||
if (address instanceof InetSocketAddress) {
|
||||
return (InetSocketAddress) address;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -5,25 +5,58 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.PeerServiceAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyClientInstrumenterFactory;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyConnectionInstrumenter;
|
||||
import reactor.netty.http.client.HttpClientConfig;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
|
||||
public final class ReactorNettySingletons {
|
||||
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-netty-1.0";
|
||||
|
||||
private static final boolean alwaysCreateConnectSpan =
|
||||
Config.get()
|
||||
.getBoolean("otel.instrumentation.reactor-netty.always-create-connect-span", false);
|
||||
|
||||
private static final Instrumenter<HttpClientConfig, HttpClientResponse> INSTRUMENTER;
|
||||
private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER;
|
||||
|
||||
static {
|
||||
ReactorNettyHttpClientAttributesExtractor httpAttributesExtractor =
|
||||
new ReactorNettyHttpClientAttributesExtractor();
|
||||
ReactorNettyNetClientAttributesExtractor netAttributesExtractor =
|
||||
new ReactorNettyNetClientAttributesExtractor();
|
||||
|
||||
INSTRUMENTER =
|
||||
Instrumenter.<HttpClientConfig, HttpClientResponse>builder(
|
||||
GlobalOpenTelemetry.get(),
|
||||
INSTRUMENTATION_NAME,
|
||||
HttpSpanNameExtractor.create(httpAttributesExtractor))
|
||||
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor))
|
||||
.addAttributesExtractor(httpAttributesExtractor)
|
||||
.addAttributesExtractor(netAttributesExtractor)
|
||||
.addAttributesExtractor(PeerServiceAttributesExtractor.create(netAttributesExtractor))
|
||||
.addRequestMetrics(HttpClientMetrics.get())
|
||||
// headers are injected in ResponseReceiverInstrumenter
|
||||
.newInstrumenter(SpanKindExtractor.alwaysClient());
|
||||
|
||||
NettyClientInstrumenterFactory instrumenterFactory =
|
||||
new NettyClientInstrumenterFactory(
|
||||
"io.opentelemetry.reactor-netty-1.0", alwaysCreateConnectSpan, false);
|
||||
new NettyClientInstrumenterFactory(INSTRUMENTATION_NAME, alwaysCreateConnectSpan, false);
|
||||
CONNECTION_INSTRUMENTER = instrumenterFactory.createConnectionInstrumenter();
|
||||
}
|
||||
|
||||
public static Instrumenter<HttpClientConfig, HttpClientResponse> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
public static NettyConnectionInstrumenter connectionInstrumenter() {
|
||||
return CONNECTION_INSTRUMENTER;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
|
||||
import java.util.function.BiFunction;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.ByteBufFlux;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
|
||||
public class ResponseReceiverInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<ClassLoader> classLoaderOptimization() {
|
||||
return hasClassesNamed("reactor.netty.http.client.HttpClient$ResponseReceiver");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return implementsInterface(named("reactor.netty.http.client.HttpClient$ResponseReceiver"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("response").and(takesArguments(0)).and(returns(named("reactor.core.publisher.Mono"))),
|
||||
this.getClass().getName() + "$ResponseMonoAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("response")
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiFunction.class))
|
||||
.and(returns(named("reactor.core.publisher.Flux"))),
|
||||
this.getClass().getName() + "$ResponseFluxAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("responseConnection")
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiFunction.class))
|
||||
.and(returns(named("reactor.core.publisher.Flux"))),
|
||||
this.getClass().getName() + "$ResponseConnectionAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("responseContent")
|
||||
.and(takesArguments(0))
|
||||
.and(returns(named("reactor.netty.ByteBufFlux"))),
|
||||
this.getClass().getName() + "$ResponseContentAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("responseSingle")
|
||||
.and(takesArguments(1))
|
||||
.and(takesArgument(0, BiFunction.class))
|
||||
.and(returns(named("reactor.core.publisher.Mono"))),
|
||||
this.getClass().getName() + "$ResponseSingleAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ResponseMonoAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
|
||||
public static HttpClient.ResponseReceiver<?> onEnter(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.This HttpClient.ResponseReceiver<?> receiver) {
|
||||
|
||||
callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class);
|
||||
if (callDepth.getAndIncrement() > 0) {
|
||||
// execute the original method on nested calls
|
||||
return null;
|
||||
}
|
||||
|
||||
// non-null value will skip the original method invocation
|
||||
return HttpResponseReceiverInstrumenter.instrument(receiver);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.Enter HttpClient.ResponseReceiver<?> modifiedReceiver,
|
||||
@Advice.Return(readOnly = false) Mono<HttpClientResponse> returnValue) {
|
||||
|
||||
try {
|
||||
if (modifiedReceiver != null) {
|
||||
returnValue = modifiedReceiver.response();
|
||||
}
|
||||
} finally {
|
||||
// needs to be called after original method to prevent StackOverflowError
|
||||
callDepth.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ResponseFluxAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
|
||||
public static HttpClient.ResponseReceiver<?> onEnter(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.This HttpClient.ResponseReceiver<?> receiver) {
|
||||
|
||||
callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class);
|
||||
if (callDepth.getAndIncrement() > 0) {
|
||||
// execute the original method on nested calls
|
||||
return null;
|
||||
}
|
||||
|
||||
// non-null value will skip the original method invocation
|
||||
return HttpResponseReceiverInstrumenter.instrument(receiver);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.Enter HttpClient.ResponseReceiver<?> modifiedReceiver,
|
||||
@Advice.Argument(0) BiFunction receiveFunction,
|
||||
@Advice.Return(readOnly = false) Flux<?> returnValue) {
|
||||
|
||||
try {
|
||||
if (modifiedReceiver != null) {
|
||||
returnValue = modifiedReceiver.response(receiveFunction);
|
||||
}
|
||||
} finally {
|
||||
// needs to be called after original method to prevent StackOverflowError
|
||||
callDepth.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ResponseConnectionAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
|
||||
public static HttpClient.ResponseReceiver<?> onEnter(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.This HttpClient.ResponseReceiver<?> receiver) {
|
||||
|
||||
callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class);
|
||||
if (callDepth.getAndIncrement() > 0) {
|
||||
// execute the original method on nested calls
|
||||
return null;
|
||||
}
|
||||
|
||||
// non-null value will skip the original method invocation
|
||||
return HttpResponseReceiverInstrumenter.instrument(receiver);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.Enter HttpClient.ResponseReceiver<?> modifiedReceiver,
|
||||
@Advice.Argument(0) BiFunction receiveFunction,
|
||||
@Advice.Return(readOnly = false) Flux<?> returnValue) {
|
||||
|
||||
try {
|
||||
if (modifiedReceiver != null) {
|
||||
returnValue = modifiedReceiver.responseConnection(receiveFunction);
|
||||
}
|
||||
} finally {
|
||||
// needs to be called after original method to prevent StackOverflowError
|
||||
callDepth.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ResponseContentAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
|
||||
public static HttpClient.ResponseReceiver<?> onEnter(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.This HttpClient.ResponseReceiver<?> receiver) {
|
||||
|
||||
callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class);
|
||||
if (callDepth.getAndIncrement() > 0) {
|
||||
// execute the original method on nested calls
|
||||
return null;
|
||||
}
|
||||
|
||||
// non-null value will skip the original method invocation
|
||||
return HttpResponseReceiverInstrumenter.instrument(receiver);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.Enter HttpClient.ResponseReceiver<?> modifiedReceiver,
|
||||
@Advice.Return(readOnly = false) ByteBufFlux returnValue) {
|
||||
|
||||
try {
|
||||
if (modifiedReceiver != null) {
|
||||
returnValue = modifiedReceiver.responseContent();
|
||||
}
|
||||
} finally {
|
||||
// needs to be called after original method to prevent StackOverflowError
|
||||
callDepth.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ResponseSingleAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
|
||||
public static HttpClient.ResponseReceiver<?> onEnter(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.This HttpClient.ResponseReceiver<?> receiver) {
|
||||
|
||||
callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class);
|
||||
if (callDepth.getAndIncrement() > 0) {
|
||||
// execute the original method on nested calls
|
||||
return null;
|
||||
}
|
||||
|
||||
// non-null value will skip the original method invocation
|
||||
return HttpResponseReceiverInstrumenter.instrument(receiver);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.Local("otelCallDepth") CallDepth callDepth,
|
||||
@Advice.Enter HttpClient.ResponseReceiver<?> modifiedReceiver,
|
||||
@Advice.Argument(0) BiFunction receiveFunction,
|
||||
@Advice.Return(readOnly = false) Mono<?> returnValue) {
|
||||
|
||||
try {
|
||||
if (modifiedReceiver != null) {
|
||||
returnValue = modifiedReceiver.responseSingle(receiveFunction);
|
||||
}
|
||||
} finally {
|
||||
// needs to be called after original method to prevent StackOverflowError
|
||||
callDepth.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -69,17 +69,6 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
})
|
||||
}
|
||||
|
||||
@Override
|
||||
String expectedClientSpanName(URI uri, String method) {
|
||||
switch (uri.toString()) {
|
||||
case "http://localhost:61/": // unopened port
|
||||
case "https://192.0.2.1/": // non routable address
|
||||
return "CONNECT"
|
||||
default:
|
||||
return super.expectedClientSpanName(uri, method)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
Throwable clientSpanError(URI uri, Throwable exception) {
|
||||
if (exception.class.getName().endsWith("ReactiveException")) {
|
||||
|
|
|
@ -19,6 +19,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
|
||||
|
@ -38,9 +39,10 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
|
|||
def "should fail SSL handshake"() {
|
||||
given:
|
||||
def httpClient = createHttpClient(["SSLv3"])
|
||||
def uri = "https://localhost:${server.httpsPort()}/success"
|
||||
|
||||
when:
|
||||
def responseMono = httpClient.get().uri("https://localhost:${server.httpsPort()}/success")
|
||||
def responseMono = httpClient.get().uri(uri)
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
|
@ -54,37 +56,49 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
|
|||
Throwable thrownException = thrown()
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
trace(0, 5) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
}
|
||||
span(1) {
|
||||
name "RESOLVE"
|
||||
kind INTERNAL
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf span(0)
|
||||
status ERROR
|
||||
// netty swallows the exception, it doesn't make any sense to hard-code the message
|
||||
errorEventWithAnyMessage(SSLHandshakeException)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.HTTP_METHOD}" "GET"
|
||||
"${SemanticAttributes.HTTP_URL}" uri
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "CONNECT"
|
||||
name "RESOLVE"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(4) {
|
||||
name "SSL handshake"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
childOf span(1)
|
||||
status ERROR
|
||||
// netty swallows the exception, it doesn't make any sense to hard-code the message
|
||||
errorEventWithAnyMessage(SSLHandshakeException)
|
||||
|
@ -92,7 +106,7 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
|
|||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -102,9 +116,10 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
|
|||
def "should successfully establish SSL handshake"() {
|
||||
given:
|
||||
def httpClient = createHttpClient()
|
||||
def uri = "https://localhost:${server.httpsPort()}/success"
|
||||
|
||||
when:
|
||||
def responseMono = httpClient.get().uri("https://localhost:${server.httpsPort()}/success")
|
||||
def responseMono = httpClient.get().uri(uri)
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
|
@ -121,46 +136,55 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
|
|||
name "parent"
|
||||
}
|
||||
span(1) {
|
||||
name "RESOLVE"
|
||||
kind INTERNAL
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.HTTP_METHOD}" "GET"
|
||||
"${SemanticAttributes.HTTP_URL}" uri
|
||||
"${SemanticAttributes.HTTP_FLAVOR}" HTTP_1_1
|
||||
"${SemanticAttributes.HTTP_STATUS_CODE}" 200
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "CONNECT"
|
||||
name "RESOLVE"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "SSL handshake"
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(4) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf(span(0))
|
||||
name "SSL handshake"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(5) {
|
||||
name "test-http-server"
|
||||
kind SERVER
|
||||
childOf(span(4))
|
||||
childOf span(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
|
||||
|
@ -34,11 +35,14 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem
|
|||
}
|
||||
|
||||
def "test successful request"() {
|
||||
when:
|
||||
given:
|
||||
def httpClient = HttpClient.create()
|
||||
def uri = "http://localhost:${server.httpPort()}/success"
|
||||
|
||||
when:
|
||||
def responseCode =
|
||||
runWithSpan("parent") {
|
||||
httpClient.get().uri("http://localhost:${server.httpPort()}/success")
|
||||
httpClient.get().uri(uri)
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
|
@ -57,19 +61,33 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem
|
|||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.HTTP_METHOD}" "GET"
|
||||
"${SemanticAttributes.HTTP_URL}" uri
|
||||
"${SemanticAttributes.HTTP_FLAVOR}" HTTP_1_1
|
||||
"${SemanticAttributes.HTTP_STATUS_CODE}" 200
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "RESOLVE"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
span(3) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
|
@ -77,25 +95,23 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem
|
|||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf(span(0))
|
||||
}
|
||||
span(4) {
|
||||
name "test-http-server"
|
||||
kind SERVER
|
||||
childOf(span(3))
|
||||
childOf span(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test failing request"() {
|
||||
when:
|
||||
given:
|
||||
def httpClient = HttpClient.create()
|
||||
def uri = "http://localhost:${PortUtils.UNUSABLE_PORT}"
|
||||
|
||||
when:
|
||||
runWithSpan("parent") {
|
||||
httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}")
|
||||
httpClient.get().uri(uri)
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
|
@ -110,7 +126,7 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem
|
|||
|
||||
and:
|
||||
assertTraces(1) {
|
||||
trace(0, 3) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
|
@ -119,26 +135,37 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem
|
|||
errorEvent(thrownException.class, thrownException.message)
|
||||
}
|
||||
span(1) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf span(0)
|
||||
status ERROR
|
||||
errorEvent(connectException.class, connectException.message)
|
||||
attributes {
|
||||
"${SemanticAttributes.HTTP_METHOD}" "GET"
|
||||
"${SemanticAttributes.HTTP_URL}" uri
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "RESOLVE"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
span(3) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
childOf span(1)
|
||||
status ERROR
|
||||
errorEvent(connectException.class, connectException.message)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
|
|||
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
|
||||
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames
|
||||
import reactor.netty.http.client.HttpClient
|
||||
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
@ -15,9 +16,12 @@ import java.util.concurrent.TimeoutException
|
|||
class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
|
||||
|
||||
HttpClient createHttpClient() {
|
||||
return HttpClient.create().tcpConfiguration({ tcpClient ->
|
||||
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
|
||||
}).resolver(getAddressResolverGroup())
|
||||
return HttpClient.create()
|
||||
.tcpConfiguration({ tcpClient ->
|
||||
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
|
||||
})
|
||||
.resolver(getAddressResolverGroup())
|
||||
.headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) })
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -26,6 +30,7 @@ class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
|
|||
.newConnection()
|
||||
.host(host)
|
||||
.port(port)
|
||||
.headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) })
|
||||
|
||||
return new SingleConnection() {
|
||||
|
||||
|
|
|
@ -6,14 +6,18 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
|
||||
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames
|
||||
import reactor.netty.http.client.HttpClient
|
||||
import reactor.netty.tcp.TcpClient
|
||||
|
||||
class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest {
|
||||
|
||||
HttpClient createHttpClient() {
|
||||
return HttpClient.from(TcpClient.create()).tcpConfiguration({ tcpClient ->
|
||||
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
|
||||
}).resolver(getAddressResolverGroup())
|
||||
return HttpClient.from(TcpClient.create())
|
||||
.tcpConfiguration({ tcpClient ->
|
||||
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
|
||||
})
|
||||
.resolver(getAddressResolverGroup())
|
||||
.headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) })
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue